From 727ebddc1d266dce7b5f4c4fb577b0eeb0d64e91 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:03:09 +0900 Subject: [PATCH] refine(v3.7.x): refine bootctrl.common CMDHelperFuncs (#289) This PR does the following things: 1. refines the common module's subprocess wrapper methods. 2. refines and simplify the boot_control.common.CMDHelperFuncs implementation a lot. 3. cleanup and remove boot_contro.errors, now each boot controller implementation has their own error types. 4. use mount slot helper's prepare_standby_dev instead of each boot controller's prepare_standby_dev method. 5. integrate the new CMDHelperFuncs to each boot controller and fix up test files accordingly. NOTE that cboot boot controller implementation is temporarily removed, it will be added back in #287 . --- otaclient/app/boot_control/_cboot.py | 579 ------------------ otaclient/app/boot_control/_common.py | 740 ++++++++++-------------- otaclient/app/boot_control/_errors.py | 98 ---- otaclient/app/boot_control/_grub.py | 72 ++- otaclient/app/boot_control/_rpi_boot.py | 69 +-- otaclient/app/boot_control/selecter.py | 7 +- otaclient/app/common.py | 113 +++- otaclient/app/ota_client_stub.py | 19 +- otaclient/app/proto/README.md | 2 +- tests/test_boot_control/test_cboot.py | 308 ---------- tests/test_boot_control/test_grub.py | 10 +- tests/test_common.py | 94 +-- 12 files changed, 534 insertions(+), 1577 deletions(-) delete mode 100644 otaclient/app/boot_control/_cboot.py delete mode 100644 otaclient/app/boot_control/_errors.py delete mode 100644 tests/test_boot_control/test_cboot.py diff --git a/otaclient/app/boot_control/_cboot.py b/otaclient/app/boot_control/_cboot.py deleted file mode 100644 index 38a3634a0..000000000 --- a/otaclient/app/boot_control/_cboot.py +++ /dev/null @@ -1,579 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import os -import re -from pathlib import Path -from functools import partial -from subprocess import CalledProcessError -from typing import Generator, Optional - - -from .. import errors as ota_errors -from ..common import ( - copytree_identical, - read_str_from_file, - subprocess_call, - subprocess_check_output, - write_str_to_file_sync, -) - -from ..proto import wrapper - -from ._common import ( - OTAStatusMixin, - PrepareMountMixin, - CMDHelperFuncs, - SlotInUseMixin, - VersionControlMixin, -) -from .configs import cboot_cfg as cfg -from .protocol import BootControllerProtocol -from .firmware import Firmware - - -logger = logging.getLogger(__name__) - - -class NvbootctrlError(Exception): - """Specific internal errors related to nvbootctrl cmd.""" - - -class Nvbootctrl: - """ - NOTE: slot and rootfs are binding accordingly! - partid mapping: p1->slot0, p2->slot1 - - slot num: 0->A, 1->B - """ - - EMMC_DEV: str = "mmcblk0" - NVME_DEV: str = "nvme0n1" - # slot0<->slot1 - CURRENT_STANDBY_FLIP = {"0": "1", "1": "0"} - # p1->slot0, p2->slot1 - PARTID_SLOTID_MAP = {"1": "0", "2": "1"} - # slot0->p1, slot1->p2 - SLOTID_PARTID_MAP = {v: k for k, v in PARTID_SLOTID_MAP.items()} - - # nvbootctrl - @staticmethod - def _nvbootctrl( - arg: str, *, target="rootfs", call_only=True, raise_exception=True - ) -> Optional[str]: - """ - Raises: - NvbootCtrlError if raise_exception is True. - """ - _cmd = f"nvbootctrl -t {target} {arg}" - try: - if call_only: - subprocess_call(_cmd, raise_exception=raise_exception) - return - else: - return subprocess_check_output(_cmd, raise_exception=raise_exception) - except CalledProcessError as e: - raise NvbootctrlError from e - - @classmethod - def check_rootdev(cls, dev: str) -> bool: - """ - check whether the givin dev is legal root dev or not - - NOTE: expect using UUID method to assign rootfs! - """ - pa = re.compile(r"\broot=(?P[\w=-]*)\b") - try: - if ma := pa.search(subprocess_check_output("cat /proc/cmdline")): - res = ma.group("rdev") - else: - raise ValueError(f"failed to find specific {dev} in cmdline") - except (CalledProcessError, ValueError) as e: - raise NvbootctrlError( - "rootfs detect failed or rootfs is not specified by PARTUUID in kernel cmdline" - ) from e - - uuid = res.split("=")[-1] - - return Path(CMDHelperFuncs.get_dev_by_partuuid(uuid)).resolve( - strict=True - ) == Path(dev).resolve(strict=True) - - @classmethod - def get_current_slot(cls) -> str: - if slot := cls._nvbootctrl("get-current-slot", call_only=False): - return slot - else: - raise NvbootctrlError - - @classmethod - def dump_slots_info(cls) -> Optional[str]: - return cls._nvbootctrl( - "dump-slots-info", call_only=False, raise_exception=False - ) - - @classmethod - def mark_boot_successful(cls): - """Mark current slot as boot successfully.""" - cls._nvbootctrl("mark-boot-successful") - - @classmethod - def set_active_boot_slot(cls, slot: str, target="rootfs"): - cls._nvbootctrl(f"set-active-boot-slot {slot}", target=target) - - @classmethod - def set_slot_as_unbootable(cls, slot: str): - cls._nvbootctrl(f"set-slot-as-unbootable {slot}") - - @classmethod - def is_slot_bootable(cls, slot: str) -> bool: - try: - cls._nvbootctrl(f"is-slot-bootable {slot}") - return True - except NvbootctrlError: - return False - - @classmethod - def is_slot_marked_successful(cls, slot: str) -> bool: - try: - cls._nvbootctrl(f"is-slot-marked-successful {slot}") - return True - except NvbootctrlError: - return False - - -class _CBootControl: - def __init__(self): - # NOTE: only support rqx-580, rqx-58g platform right now! - # detect the chip id - self.chip_id = read_str_from_file(cfg.TEGRA_CHIP_ID_PATH) - if not self.chip_id or int(self.chip_id) not in cfg.CHIP_ID_MODEL_MAP: - raise NotImplementedError( - f"unsupported platform found (chip_id: {self.chip_id}), abort" - ) - - self.chip_id = int(self.chip_id) - self.model = cfg.CHIP_ID_MODEL_MAP[self.chip_id] - logger.info(f"{self.model=}, (chip_id={hex(self.chip_id)})") - - # initializing dev info - self._init_dev_info() - logger.info(f"finished cboot control init: {Nvbootctrl.dump_slots_info()=}") - - def _init_dev_info(self): - self.current_slot: str = Nvbootctrl.get_current_slot() - self.current_rootfs_dev: str = CMDHelperFuncs.get_current_rootfs_dev() - # NOTE: boot dev is always emmc device now - self.current_boot_dev: str = f"/dev/{Nvbootctrl.EMMC_DEV}p{Nvbootctrl.SLOTID_PARTID_MAP[self.current_slot]}" - - self.standby_slot: str = Nvbootctrl.CURRENT_STANDBY_FLIP[self.current_slot] - standby_partid = Nvbootctrl.SLOTID_PARTID_MAP[self.standby_slot] - self.standby_boot_dev: str = f"/dev/{Nvbootctrl.EMMC_DEV}p{standby_partid}" - - # detect rootfs position - if self.current_rootfs_dev.find(Nvbootctrl.NVME_DEV) != -1: - logger.debug("rootfs on external storage detected, nvme rootfs is enable") - self.is_rootfs_on_external = True - self.standby_rootfs_dev = f"/dev/{Nvbootctrl.NVME_DEV}p{standby_partid}" - self.standby_slot_partuuid_str = CMDHelperFuncs.get_partuuid_str_by_dev( - self.standby_rootfs_dev - ) - elif self.current_rootfs_dev.find(Nvbootctrl.EMMC_DEV) != -1: - logger.debug("using internal storage as rootfs") - self.is_rootfs_on_external = False - self.standby_rootfs_dev = f"/dev/{Nvbootctrl.EMMC_DEV}p{standby_partid}" - self.standby_slot_partuuid_str = CMDHelperFuncs.get_partuuid_str_by_dev( - self.standby_rootfs_dev - ) - else: - raise NotImplementedError( - f"rootfs on {self.current_rootfs_dev} is not supported, abort" - ) - - # ensure rootfs is as expected - if not Nvbootctrl.check_rootdev(self.current_rootfs_dev): - msg = f"rootfs mismatch, expect {self.current_rootfs_dev} as rootfs" - raise NvbootctrlError(msg) - elif Nvbootctrl.check_rootdev(self.standby_rootfs_dev): - msg = ( - f"rootfs mismatch, expect {self.standby_rootfs_dev} as standby slot dev" - ) - raise NvbootctrlError(msg) - - logger.info("dev info initializing completed") - logger.info( - f"{self.current_slot=}, {self.current_boot_dev=}, {self.current_rootfs_dev=}" - ) - logger.info( - f"{self.standby_slot=}, {self.standby_boot_dev=}, {self.standby_rootfs_dev=}" - ) - - ###### CBootControl API ###### - def get_current_slot(self) -> str: - return self.current_slot - - def get_current_rootfs_dev(self) -> str: - return self.current_rootfs_dev - - def get_standby_rootfs_dev(self) -> str: - return self.standby_rootfs_dev - - def get_standby_slot(self) -> str: - return self.standby_slot - - def get_standby_rootfs_partuuid_str(self) -> str: - return self.standby_slot_partuuid_str - - def get_standby_boot_dev(self) -> str: - return self.standby_boot_dev - - def is_external_rootfs_enabled(self) -> bool: - return self.is_rootfs_on_external - - def mark_current_slot_boot_successful(self): - logger.info(f"mark {self.current_slot=} as boot successful") - Nvbootctrl.mark_boot_successful() - - def set_standby_slot_unbootable(self): - slot = self.standby_slot - Nvbootctrl.set_slot_as_unbootable(slot) - - def switch_boot(self): - slot = self.standby_slot - - logger.info(f"switch boot to {slot=}") - Nvbootctrl.set_active_boot_slot(slot, target="bootloader") - Nvbootctrl.set_active_boot_slot(slot) - - def is_current_slot_marked_successful(self) -> bool: - slot = self.current_slot - return Nvbootctrl.is_slot_marked_successful(slot) - - @staticmethod - def update_extlinux_cfg(dst: Path, ref: Path, partuuid_str: str): - """Write dst extlinux.conf based on reference extlinux.conf and partuuid_str. - - Params: - dst: path to dst extlinux.conf file - ref: reference extlinux.conf file - partuuid_str: rootfs specification string like "PARTUUID=" - """ - - def _replace(ma: re.Match, repl: str): - append_l: str = ma.group(0) - if append_l.startswith("#"): - return append_l - res, n = re.compile(r"root=[\w\-=]*").subn(repl, append_l) - if not n: - res = f"{append_l} {repl}" - - return res - - _repl_func = partial(_replace, repl=f"root={partuuid_str}") - write_str_to_file_sync( - dst, re.compile(r"\n\s*APPEND.*").sub(_repl_func, ref.read_text()) - ) - - -class CBootController( - PrepareMountMixin, - SlotInUseMixin, - OTAStatusMixin, - VersionControlMixin, - BootControllerProtocol, -): - def __init__(self) -> None: - try: - self._cboot_control: _CBootControl = _CBootControl() - - # load paths - ## first try to unmount standby dev if possible - self.standby_slot_dev = self._cboot_control.get_standby_rootfs_dev() - CMDHelperFuncs.umount(self.standby_slot_dev) - - self.standby_slot_mount_point = Path(cfg.MOUNT_POINT) - self.standby_slot_mount_point.mkdir(exist_ok=True) - - ## refroot mount point - _refroot_mount_point = cfg.ACTIVE_ROOT_MOUNT_POINT - # first try to umount refroot mount point - CMDHelperFuncs.umount(_refroot_mount_point) - if not os.path.isdir(_refroot_mount_point): - os.mkdir(_refroot_mount_point) - self.ref_slot_mount_point = Path(_refroot_mount_point) - - ## ota-status dir - ### current slot - self.current_ota_status_dir = Path(cfg.ACTIVE_ROOTFS_PATH) / Path( - cfg.OTA_STATUS_DIR - ).relative_to("/") - self.current_ota_status_dir.mkdir(parents=True, exist_ok=True) - ### standby slot - # NOTE: might not yet be populated before OTA update applied! - self.standby_ota_status_dir = self.standby_slot_mount_point / Path( - cfg.OTA_STATUS_DIR - ).relative_to("/") - - # init ota-status - self._init_boot_control() - except NotImplementedError as e: - raise ota_errors.BootControlPlatformUnsupported(module=__name__) from e - except Exception as e: - raise ota_errors.BootControlStartupFailed( - f"unspecific boot controller startup failure: {e!r}", module=__name__ - ) from e - - ###### private methods ###### - - def _init_boot_control(self): - """Init boot control and ota-status on start-up.""" - # load ota_status str and slot_in_use - _ota_status = self._load_current_ota_status() - _slot_in_use = self._load_current_slot_in_use() - current_slot = self._cboot_control.get_current_slot() - if not (_ota_status and _slot_in_use): - logger.info("initializing boot control files...") - _ota_status = wrapper.StatusOta.INITIALIZED - self._store_current_slot_in_use(current_slot) - self._store_current_ota_status(wrapper.StatusOta.INITIALIZED) - - if _ota_status in [wrapper.StatusOta.UPDATING, wrapper.StatusOta.ROLLBACKING]: - if self._is_switching_boot(): - logger.info("finalizing switching boot...") - # set the current slot(switched slot) as boot successful - self._cboot_control.mark_current_slot_boot_successful() - # switch ota_status - _ota_status = wrapper.StatusOta.SUCCESS - else: - if _ota_status == wrapper.StatusOta.ROLLBACKING: - _ota_status = wrapper.StatusOta.ROLLBACK_FAILURE - else: - _ota_status = wrapper.StatusOta.FAILURE - # status except UPDATING/ROLLBACKING remained as it - - # detect failed reboot, but only print error logging - if ( - _ota_status != wrapper.StatusOta.INITIALIZED - and _slot_in_use != current_slot - ): - logger.error( - f"boot into old slot {current_slot}, " - f"but slot_in_use indicates it should boot into {_slot_in_use}, " - "this might indicate a failed finalization at first reboot after update/rollback" - ) - - self.ota_status = _ota_status - self._store_current_ota_status(_ota_status) - logger.info(f"boot control init finished, ota_status is {_ota_status}") - - def _is_switching_boot(self) -> bool: - # evidence 1: nvbootctrl status - # the newly updated slot should not be marked as successful on the first reboot - _nvboot_res = not self._cboot_control.is_current_slot_marked_successful() - - # evidence 2: ota_status - # the newly updated/rollbacked slot should have ota-status as updating/rollback - _ota_status = self._load_current_ota_status() in [ - wrapper.StatusOta.UPDATING, - wrapper.StatusOta.ROLLBACKING, - ] - - # evidence 3: slot in use - # the slot_in_use file should have the same slot as current slot - _is_slot_in_use = ( - self._load_current_slot_in_use() == self._cboot_control.get_current_slot() - ) - - # NOTE(20230609): only check _ota_status_ and _is_slot_in_use, remove _nvboot_res check. - # as long as we are in UPDATING(_ota_status flag), - # and we should in this slot(_is_slot_in_use), then we are OK to finalize. - _is_switching_boot = _ota_status and _is_slot_in_use - logger.info( - "[switch_boot detection]\n" - f"ota_status is UPDATING in this slot: {_ota_status=}\n" - f"slot_in_use indicates we should in this slot: {_is_slot_in_use=}\n" - f"{_is_switching_boot=}" - ) - if _is_switching_boot and not _nvboot_res: - logger.warning( - f"{_ota_status=} and {_is_slot_in_use=} " - "show that we should be in finalizing switching boot stage," - f"but this slot is not marked as unbootable." - ) - return _is_switching_boot - - def _populate_boot_folder_to_separate_bootdev(self): - # mount the actual standby_boot_dev now - _boot_dir_mount_point = Path(cfg.SEPARATE_BOOT_MOUNT_POINT) - _boot_dir_mount_point.mkdir(exist_ok=True, parents=True) - - try: - CMDHelperFuncs.mount_rw( - self._cboot_control.get_standby_boot_dev(), - _boot_dir_mount_point, - ) - except Exception as e: - _msg = f"failed to mount standby boot dev: {e!r}" - logger.error(_msg) - raise NvbootctrlError(_msg) from e - - try: - dst = _boot_dir_mount_point / "boot" - dst.mkdir(exist_ok=True, parents=True) - src = self.standby_slot_mount_point / "boot" - - # copy the standby slot's boot folder to emmc boot dev - copytree_identical(src, dst) - except Exception as e: - _msg = f"failed to populate boot folder to separate bootdev: {e!r}" - logger.error(_msg) - raise NvbootctrlError(_msg) from e - finally: - # unmount standby emmc boot dev on finish/failure - try: - CMDHelperFuncs.umount(_boot_dir_mount_point) - except Exception as e: - _failure_msg = f"failed to umount boot dev: {e!r}" - logger.warning(_failure_msg) - # no need to raise to the caller - - ###### public methods ###### - # also includes methods from OTAStatusMixin, VersionControlMixin - # load_version, get_ota_status - - def on_operation_failure(self): - """Failure registering and cleanup at failure.""" - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - # when standby slot is not created, otastatus is not needed to be set - if CMDHelperFuncs.is_target_mounted(self.standby_slot_mount_point): - self._store_standby_ota_status(wrapper.StatusOta.FAILURE) - - logger.warning("on failure try to unmounting standby slot...") - self._umount_all(ignore_error=True) - - def get_standby_slot_path(self) -> Path: - return self.standby_slot_mount_point - - def get_standby_boot_dir(self) -> Path: - """ - NOTE: in cboot controller, we directly use the /boot dir under the standby slot, - and sync to the external boot dev in the post_update if needed. - """ - return self.standby_slot_mount_point / "boot" - - def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby=False): - try: - # store current slot status - _target_slot = self._cboot_control.get_standby_slot() - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - self._store_current_slot_in_use(_target_slot) - - # setup updating - self._cboot_control.set_standby_slot_unbootable() - self._prepare_and_mount_standby( - self._cboot_control.get_standby_rootfs_dev(), - erase=erase_standby, - ) - self._mount_refroot( - standby_dev=self._cboot_control.get_standby_rootfs_dev(), - active_dev=self._cboot_control.get_current_rootfs_dev(), - standby_as_ref=standby_as_ref, - ) - - ### re-populate /boot/ota-status folder for standby slot - # create the ota-status folder unconditionally - self.standby_ota_status_dir.mkdir(exist_ok=True, parents=True) - # store status to standby slot - self._store_standby_ota_status(wrapper.StatusOta.UPDATING) - self._store_standby_version(version) - self._store_standby_slot_in_use(_target_slot) - - logger.info("pre-update setting finished") - except Exception as e: - _err_msg = f"failed on pre_update: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPreUpdateFailed( - f"{e!r}", module=__name__ - ) from e - - def post_update(self) -> Generator[None, None, None]: - try: - # firmware update - firmware = Firmware( - self.standby_slot_mount_point - / Path(cfg.FIRMWARE_CONFIG).relative_to("/") - ) - firmware.update(int(self._cboot_control.get_standby_slot())) - - # update extlinux_cfg file - _extlinux_cfg = self.standby_slot_mount_point / Path( - cfg.EXTLINUX_FILE - ).relative_to("/") - self._cboot_control.update_extlinux_cfg( - dst=_extlinux_cfg, - ref=_extlinux_cfg, - partuuid_str=self._cboot_control.get_standby_rootfs_partuuid_str(), - ) - - # NOTE: we didn't prepare /boot/ota here, - # process_persistent does this for us - if self._cboot_control.is_external_rootfs_enabled(): - logger.info( - "rootfs on external storage detected: " - "updating the /boot folder in standby bootdev..." - ) - self._populate_boot_folder_to_separate_bootdev() - - logger.info("post update finished, rebooting...") - self._umount_all(ignore_error=True) - self._cboot_control.switch_boot() - - logger.info(f"[post-update]: {Nvbootctrl.dump_slots_info()=}") - yield # hand over control back to otaclient - CMDHelperFuncs.reboot() - except Exception as e: - _err_msg = f"failed on post_update: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPostUpdateFailed( - _err_msg, module=__name__ - ) from e - - def pre_rollback(self): - try: - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - self._prepare_and_mount_standby( - self._cboot_control.get_standby_rootfs_dev(), - erase=False, - ) - # store ROLLBACKING status to standby - self._store_standby_ota_status(wrapper.StatusOta.ROLLBACKING) - except Exception as e: - _err_msg = f"failed on pre_rollback: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPreRollbackFailed( - _err_msg, module=__name__ - ) from e - - def post_rollback(self): - try: - self._cboot_control.switch_boot() - CMDHelperFuncs.reboot() - except Exception as e: - _err_msg = f"failed on post_rollback: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPostRollbackFailed( - _err_msg, module=__name__ - ) from e diff --git a/otaclient/app/boot_control/_common.py b/otaclient/app/boot_control/_common.py index 0ba392b35..401d316a7 100644 --- a/otaclient/app/boot_control/_common.py +++ b/otaclient/app/boot_control/_common.py @@ -14,20 +14,13 @@ r"""Shared utils for boot_controller.""" +from __future__ import annotations import logging -import os import shutil import sys from pathlib import Path from subprocess import CalledProcessError -from typing import List, Optional, Union, Callable - -from ._errors import ( - BootControlError, - MountError, - MkfsError, - MountFailedReason, -) +from typing import Literal, Optional, Union, Callable, NoReturn from ..configs import config as cfg from ..common import ( @@ -41,350 +34,323 @@ logger = logging.getLogger(__name__) +# fmt: off +PartitionToken = Literal[ + "UUID", "PARTUUID", + "LABEL", "PARTLABEL", + "TYPE", +] +# fmt: on + class CMDHelperFuncs: - """HelperFuncs bundle for wrapped linux cmd.""" + """HelperFuncs bundle for wrapped linux cmd. - @staticmethod - @MountFailedReason.parse_failed_reason - def _mount( - dev: str, - mount_point: str, - *, - options: Optional[List[str]] = None, - args: Optional[List[str]] = None, - ): - """ - mount [-o option1[,option2, ...]]] [args[0] [args[1]...]] + When underlying subprocess call failed and is True, + functions defined in this class will raise the original exception + to the upper caller. + """ - Raises: - MountError on failed mounting. - """ - _option_str = "" - if options: - _option_str = f"-o {','.join(options)}" + @classmethod + def get_attrs_by_dev( + cls, attr: PartitionToken, dev: Path | str, *, raise_exception: bool = True + ) -> str: + """Get from . - _args_str = "" - if args: - _args_str = f"{' '.join(args)}" + This is implemented by calling: + `lsblk -in -o ` - _cmd = f"mount {_option_str} {_args_str} {dev} {mount_point}" - subprocess_call(_cmd, raise_exception=True) + Args: + attr (PartitionToken): the attribute to retrieve from the . + dev (Path | str): the target device path. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @staticmethod - def _findfs(key: str, value: str) -> str: - """ - findfs finds a partition by conditions - Usage: - findfs [options] {LABEL,UUID,PARTUUID,PARTLABEL}= + Returns: + str: of . """ - _cmd = f"findfs {key}={value}" - return subprocess_check_output(_cmd) - - @staticmethod - def _findmnt(args: str) -> str: - _cmd = f"findmnt {args}" - return subprocess_check_output(_cmd) + cmd = ["lsblk", "-ino", attr, str(dev)] + return subprocess_check_output(cmd, raise_exception=raise_exception) - @staticmethod - def _lsblk(args: str, *, raise_exception=False) -> str: - """lsblk command wrapper. + @classmethod + def get_dev_by_token( + cls, token: PartitionToken, value: str, *, raise_exception: bool = True + ) -> Optional[list[str]]: + """Get a list of device(s) that matches the = pair. - Default return empty str if raise_exception==False. - """ - _cmd = f"lsblk {args}" - return subprocess_check_output( - _cmd, - raise_exception=raise_exception, - default="", - ) + This is implemented by calling: + blkid -o device -t = - ###### derived helper methods ###### + Args: + token (PartitionToken): which attribute of device to match. + value (str): the value of the attribute. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @classmethod - def get_fslabel_by_dev(cls, dev: str) -> str: - """Return the fslabel of the dev if any or empty str.""" - args = f"-in -o LABEL {dev}" - return cls._lsblk(args) + Returns: + Optional[list[str]]: If there is at least one device found, return a list + contains all found device(s), otherwise None. + """ + cmd = ["blkid", "-o", "device", "-t", f"{token}={value}"] + if res := subprocess_check_output(cmd, raise_exception=raise_exception): + return res.splitlines() @classmethod - def get_partuuid_by_dev(cls, dev: str) -> str: - """Return partuuid of input device.""" - args = f"-in -o PARTUUID {dev}" - try: - return cls._lsblk(args, raise_exception=True) - except Exception as e: - msg = f"failed to get partuuid for {dev}: {e!r}" - raise ValueError(msg) from None + def get_current_rootfs_dev(cls, *, raise_exception: bool = True) -> str: + """Get the devpath of current rootfs dev. - @classmethod - def get_uuid_by_dev(cls, dev: str) -> str: - """Return uuid of input device.""" - args = f"-in -o UUID {dev}" - try: - return cls._lsblk(args, raise_exception=True) - except Exception as e: - msg = f"failed to get uuid for {dev}: {e!r}" - raise ValueError(msg) from None + This is implemented by calling + findmnt -nfc -o SOURCE - @classmethod - def get_uuid_str_by_dev(cls, dev: str) -> str: - """Return UUID string of input device. + Args: + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - str like: "UUID=" + str: the devpath of current rootfs device. """ - return f"UUID={cls.get_uuid_by_dev(dev)}" + cmd = ["findmnt", "-nfco", "SOURCE", cfg.ACTIVE_ROOTFS_PATH] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_partuuid_str_by_dev(cls, dev: str) -> str: - """Return PARTUUID string of input device. + def get_mount_point_by_dev(cls, dev: str, *, raise_exception: bool = True) -> str: + """Get the FIRST mountpoint of the . + + This is implemented by calling: + findmnt -nfo TARGET + + NOTE: option -f is used to only show the first file system. + + Args: + dev (str): the device to check against. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - str like: "PARTUUID=" + str: the FIRST mountpint of the , or empty string if is False + and the subprocess call failed(due to dev is not mounted or other reasons). """ - return f"PARTUUID={cls.get_partuuid_by_dev(dev)}" - - @classmethod - def get_dev_by_partlabel(cls, partlabel: str) -> str: - return cls._findfs("PARTLABEL", partlabel) + cmd = ["findmnt", "-nfo", "TARGET", dev] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_dev_by_fslabel(cls, fslabel: str) -> str: - return cls._findfs("LABEL", fslabel) + def get_dev_by_mount_point( + cls, mount_point: str, *, raise_exception: bool = True + ) -> str: + """Return the source dev of the given . - @classmethod - def get_dev_by_partuuid(cls, partuuid: str) -> str: - return cls._findfs("PARTUUID", partuuid) + This is implemented by calling: + findmnt -no SOURCE - @classmethod - def get_current_rootfs_dev(cls) -> str: - """ - NOTE: - -o : only print - -n: no headings - -f: only show the first file system - -c: canonicalize printed paths + Args: + mount_point (str): mount_point to check against. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - full path to dev of the current rootfs + str: the source device of . """ - if res := cls._findmnt("/ -o SOURCE -n -f -c"): - return res - else: - raise BootControlError("failed to detect current rootfs") + cmd = ["findmnt", "-no", "SOURCE", mount_point] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_mount_point_by_dev(cls, dev: str, *, raise_exception=True) -> str: - """ - findmnt -o TARGET -n + def is_target_mounted( + cls, target: Path | str, *, raise_exception: bool = True + ) -> bool: + """Check if is mounted or not. can be a dev or a mount point. - NOTE: findmnt raw result might have multiple lines if target dev is bind mounted. - use option -f to only show the first file system - """ - mount_points = cls._findmnt(f"{dev} -o TARGET -n -f") - if not mount_points and raise_exception: - raise MountError(f"{dev} is not mounted") + This is implemented by calling: + findmnt - return mount_points + Args: + target (Path | str): the target to check against. Could be a device or a mount point. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @classmethod - def get_dev_by_mount_point(cls, mount_point: str) -> str: - """Return the underlying mounted dev of the given mount_point.""" - return cls._findmnt(f"-no SOURCE {mount_point}") + Returns: + bool: return True if the target has at least one mount_point. + """ + cmd = ["findmnt", target] + return bool(subprocess_check_output(cmd, raise_exception=raise_exception)) @classmethod - def is_target_mounted(cls, target: Union[Path, str]) -> bool: - return cls._findmnt(f"{target}") != "" + def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> str: + """Get the parent devpath from . - @classmethod - def get_parent_dev(cls, child_device: str) -> str: - """ When `/dev/nvme0n1p1` is specified as child_device, /dev/nvme0n1 is returned. - cmd params: - -d: print the result from specified dev only. - """ - cmd = f"-idpn -o PKNAME {child_device}" - if res := cls._lsblk(cmd): - return res - else: - raise ValueError(f"{child_device} not found or not a partition") + This function is implemented by calling: + lsblk -idpno PKNAME - @classmethod - def get_dev_family(cls, parent_device: str, *, include_parent=True) -> List[str]: - """ - When `/dev/nvme0n1` is specified as parent_device, - ["/dev/nvme0n1", "/dev/nvme0n1p1", "/dev/nvme0n1p2"...] will be return - """ - cmd = f"-Pp -o NAME {parent_device}" - res = list( - map( - lambda line: line.split("=")[-1].strip('"'), - cls._lsblk(cmd).splitlines(), - ) - ) + Args: + child_device (str): the device to find parent device from. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - # the first line is the parent of the dev family - if include_parent: - return res - else: - return res[1:] + Returns: + str: the parent device of the specific . + """ + cmd = ["lsblk", "-idpno", "PKNAME", child_device] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_dev_size(cls, dev: str) -> int: - """Return the size of dev by bytes. + def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = True): + """Set to ext4 formatted . + + This is implemented by calling: + e2label - NOTE: - -d: print the result from specified dev only - -b: print size in bytes - -n: no headings + Args: + dev (str): the ext4 partition device. + fslabel (str): the fslabel to be set. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - cmd = f"-dbn -o SIZE {dev}" - try: - return int(cls._lsblk(cmd)) - except ValueError: - raise ValueError(f"failed to get size of {dev}") from None + cmd = ["e2label", dev, fslabel] + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def set_dev_fslabel(cls, dev: str, fslabel: str): - cmd = f"e2label {dev} {fslabel}" - try: - subprocess_call(cmd, raise_exception=True) - except Exception as e: - raise ValueError(f"failed to set {fslabel=} to {dev=}: {e!r}") from None + def mount_rw( + cls, target: str, mount_point: Path | str, *, raise_exception: bool = True + ): + """Mount the to read-write. - @classmethod - def mount_rw(cls, target: str, mount_point: Union[Path, str]): - """Mount the target to the mount_point read-write. + This is implemented by calling: + mount -o rw --make-private --make-unbindable NOTE: pass args = ["--make-private", "--make-unbindable"] to prevent mount events propagation to/from this mount point. - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (Path | str): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - options = ["rw"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "rw", + "--make-private", "--make-unbindable", + target, + str(mount_point), + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def bind_mount_ro(cls, target: str, mount_point: Union[Path, str]): - """Bind mount the target to the mount_point read-only. + def bind_mount_ro( + cls, target: str, mount_point: Path | str, *, raise_exception: bool = True + ): + """Bind mount the to read-only. - NOTE: pass args = ["--make-private", "--make-unbindable"] to prevent - mount events propagation to/from this mount point. + This is implemented by calling: + mount -o bind,ro --make-private --make-unbindable - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (Path | str): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - options = ["bind", "ro"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "bind,ro", + "--make-private", "--make-unbindable", + target, + str(mount_point) + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def umount(cls, target: Union[Path, str], *, ignore_error=False): - """Try to unmount the . + def umount(cls, target: Path | str, *, raise_exception: bool = True): + """Try to umount the . + + This is implemented by calling: + umount - Raises: - If ignore_error is False, raises MountError on failed unmounting. + Before calling umount, the will be check whether it is mounted, + if it is not mounted, this function will return directly. + + Args: + target (Path | str): target to be umounted. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ # first try to check whether the target(either a mount point or a dev) # is mounted - if not cls.is_target_mounted(target): + if not cls.is_target_mounted(target, raise_exception=False): return # if the target is mounted, try to unmount it. - try: - _cmd = f"umount -l {target}" - subprocess_call(_cmd, raise_exception=True) - except CalledProcessError as e: - _failure_msg = ( - f"failed to umount {target}: {e.returncode=}, {e.stderr=}, {e.stdout=}" - ) - logger.warning(_failure_msg) - - if not ignore_error: - raise MountError(_failure_msg) from None - - @classmethod - def mkfs_ext4(cls, dev: str, *, fslabel: Optional[str] = None): - """Call mkfs.ext4 on . - - Raises: - MkfsError on failed ext4 partition formatting. - """ - # NOTE: preserve the UUID and FSLABEL(if set) - _specify_uuid = "" - try: - _specify_uuid = f"-U {cls.get_uuid_by_dev(dev)}" - except Exception: - pass - - # if fslabel is specified, then use it, - # otherwise try to detect the previously set one - _specify_fslabel = "" - if fslabel: - _specify_fslabel = f"-L {fslabel}" - elif _fslabel := cls.get_fslabel_by_dev(dev): - _specify_fslabel = f"-L {_fslabel}" - - try: - logger.warning(f"format {dev} to ext4...") - _cmd = f"mkfs.ext4 {_specify_uuid} {_specify_fslabel} {dev}" - subprocess_call(_cmd, raise_exception=True) - except CalledProcessError as e: - _failure_msg = f"failed to apply mkfs.ext4 on {dev}: {e!r}" - logger.error(_failure_msg) - raise MkfsError(_failure_msg) + _cmd = ["umount", str(target)] + subprocess_call(_cmd, raise_exception=raise_exception) @classmethod - def mount_refroot( + def mkfs_ext4( cls, - standby_slot_dev: str, - active_slot_dev: str, - refroot_mount_point: str, + dev: str, *, - standby_as_ref: bool, + fslabel: Optional[str] = None, + fsuuid: Optional[str] = None, + raise_exception: bool = True, ): - """Mount reference rootfs that we copy files from to . + """Create new ext4 formatted filesystem on , optionally with + and/or . - This method bind mount refroot as ro with make-private flag and make-unbindable flag, - to prevent ANY accidental writes/changes to the refroot. - - Raises: - MountError on failed mounting. + Args: + dev (str): device to be formatted to ext4. + fslabel (Optional[str], optional): fslabel of the new ext4 filesystem. Defaults to None. + When it is None, this function will try to preserve the previous fslabel. + fsuuid (Optional[str], optional): fsuuid of the new ext4 filesystem. Defaults to None. + When it is None, this function will try to preserve the previous fsuuid. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - _refroot_dev = standby_slot_dev if standby_as_ref else active_slot_dev + cmd = ["mkfs.ext4", "-F"] - # NOTE: set raise_exception to false to allow not mounted - # not mounted dev will have empty return str - if _refroot_active_mount_point := CMDHelperFuncs.get_mount_point_by_dev( - _refroot_dev, raise_exception=False - ): - CMDHelperFuncs.bind_mount_ro( - _refroot_active_mount_point, - refroot_mount_point, - ) - else: - # NOTE: refroot is expected to be mounted, - # if refroot is active rootfs, it is mounted as /; - # if refroot is standby rootfs(in-place update mode), - # it will be mounted to /mnt/standby(rw), so we still mount it as bind,ro - raise MountError("refroot is expected to be mounted") + if not fsuuid: + try: + fsuuid = cls.get_attrs_by_dev("UUID", dev) + assert fsuuid + logger.debug(f"reuse previous UUID: {fsuuid}") + except Exception: + pass + if fsuuid: + logger.debug(f"using UUID: {fsuuid}") + cmd.extend(["-U", fsuuid]) + + if not fslabel: + try: + fslabel = cls.get_attrs_by_dev("LABEL", dev) + assert fslabel + logger.debug(f"reuse previous fs LABEL: {fslabel}") + except Exception: + pass + if fslabel: + logger.debug(f"using fs LABEL: {fslabel}") + cmd.extend(["-L", fslabel]) + + cmd.append(dev) + logger.warning(f"format {dev} to ext4: {cmd=}") + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def mount_ro(cls, *, target: str, mount_point: Union[str, Path]): - """Mount target on mount_point read-only. + def mount_ro( + cls, *, target: str, mount_point: str | Path, raise_exception: bool = True + ): + """Mount to read-only. - If the target device is mounted, we bind mount the target device to mount_point, + If the target device is mounted, we bind mount the target device to mount_point. if the target device is not mounted, we directly mount it to the mount_point. - This method mount the target as ro with make-private flag and make-unbindable flag, - to prevent ANY accidental writes/changes to the target. - - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (str | Path): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ # NOTE: set raise_exception to false to allow not mounted # not mounted dev will have empty return str @@ -394,30 +360,47 @@ def mount_ro(cls, *, target: str, mount_point: Union[str, Path]): CMDHelperFuncs.bind_mount_ro( _active_mount_point, mount_point, + raise_exception=raise_exception, ) else: # target is not mounted, we mount it by ourself - options = ["ro"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "ro", + "--make-private", "--make-unbindable", + target, + str(mount_point), + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def reboot(cls): - """Reboot the whole system otaclient running at and terminate otaclient. + def reboot(cls, args: Optional[list[str]] = None) -> NoReturn: + """Reboot the system, with optional args passed to reboot command. + + This is implemented by calling: + reboot [args[0], args[1], ...] + + NOTE(20230614): this command makes otaclient exit immediately. + NOTE(20240421): rpi_boot's reboot takes args. - NOTE(20230614): this command MUST also make otaclient exit immediately. + Args: + args (Optional[list[str]], optional): args passed to reboot command. + Defaults to None, not passing any args. """ - os.sync() + cmd = ["reboot"] + if args: + cmd.extend(args) + try: - subprocess_call("reboot", raise_exception=True) + subprocess_call(cmd, raise_exception=True) sys.exit(0) except CalledProcessError: logger.exception("failed to reboot") raise -###### helper mixins ###### - FinalizeSwitchBootFunc = Callable[[], bool] @@ -665,114 +648,6 @@ def booted_ota_status(self) -> wrapper.StatusOta: return self._ota_status -class SlotInUseMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - - def _store_current_slot_in_use(self, _slot: str): - write_str_to_file_sync( - self.current_ota_status_dir / cfg.SLOT_IN_USE_FNAME, _slot - ) - - def _store_standby_slot_in_use(self, _slot: str): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.SLOT_IN_USE_FNAME, _slot - ) - - def _load_current_slot_in_use(self) -> Optional[str]: - if res := read_str_from_file( - self.current_ota_status_dir / cfg.SLOT_IN_USE_FNAME, default="" - ): - return res - - -class OTAStatusMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - ota_status: wrapper.StatusOta - - def _store_current_ota_status(self, _status: wrapper.StatusOta): - write_str_to_file_sync( - self.current_ota_status_dir / cfg.OTA_STATUS_FNAME, _status.name - ) - - def _store_standby_ota_status(self, _status: wrapper.StatusOta): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.OTA_STATUS_FNAME, _status.name - ) - - def _load_current_ota_status(self) -> Optional[wrapper.StatusOta]: - if _status_str := read_str_from_file( - self.current_ota_status_dir / cfg.OTA_STATUS_FNAME - ).upper(): - try: - return wrapper.StatusOta[_status_str] - except KeyError: - pass # invalid status string - - def get_booted_ota_status(self) -> wrapper.StatusOta: - return self.ota_status - - -class VersionControlMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - - def _store_standby_version(self, _version: str): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.OTA_VERSION_FNAME, - _version, - ) - - def load_version(self) -> str: - _version = read_str_from_file( - self.current_ota_status_dir / cfg.OTA_VERSION_FNAME, - missing_ok=True, - default="", - ) - if not _version: - logger.warning("version file not found, return empty version string") - - return _version - - -class PrepareMountMixin: - standby_slot_mount_point: Path - ref_slot_mount_point: Path - - def _prepare_and_mount_standby(self, standby_slot_dev: str, *, erase=False): - self.standby_slot_mount_point.mkdir(parents=True, exist_ok=True) - - # first try umount the dev - CMDHelperFuncs.umount(standby_slot_dev) - - # format the whole standby slot if needed - if erase: - logger.warning(f"perform mkfs.ext4 on standby slot({standby_slot_dev})") - CMDHelperFuncs.mkfs_ext4(standby_slot_dev) - - # try to mount the standby dev - CMDHelperFuncs.mount_rw(standby_slot_dev, self.standby_slot_mount_point) - - def _mount_refroot( - self, - *, - standby_dev: str, - active_dev: str, - standby_as_ref: bool, - ): - CMDHelperFuncs.mount_refroot( - standby_slot_dev=standby_dev, - active_slot_dev=active_dev, - refroot_mount_point=str(self.ref_slot_mount_point), - standby_as_ref=standby_as_ref, - ) - - def _umount_all(self, *, ignore_error: bool = False): - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=ignore_error) - CMDHelperFuncs.umount(self.ref_slot_mount_point, ignore_error=ignore_error) - - class SlotMountHelper: """Helper class that provides methods for mounting slots.""" @@ -800,57 +675,27 @@ def __init__( cfg.BOOT_DIR ).relative_to("/") - def mount_standby(self, *, raise_exc: bool = True) -> bool: - """Mount standby slot dev to . - - Args: - erase_standby: whether to format standby slot dev to ext4 before mounting. - raise_exc: if exception occurs, raise it or not. - - Return: - A bool indicates whether the mount succeeded or not. - """ + def mount_standby(self) -> None: + """Mount standby slot dev to .""" logger.debug("mount standby slot rootfs dev...") - try: - # first try umount mount point and dev - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=True) - if CMDHelperFuncs.is_target_mounted(self.standby_slot_dev): - CMDHelperFuncs.umount(self.standby_slot_dev) - - # try to mount the standby dev - CMDHelperFuncs.mount_rw( - target=self.standby_slot_dev, - mount_point=self.standby_slot_mount_point, - ) - return True - except Exception: - if raise_exc: - raise - return False - - def mount_active(self, *, raise_exc: bool = True) -> bool: - """Mount active rootfs ready-only. + if CMDHelperFuncs.is_target_mounted( + self.standby_slot_dev, raise_exception=False + ): + logger.debug(f"{self.standby_slot_dev=} is mounted, try to umount it ...") + CMDHelperFuncs.umount(self.standby_slot_dev, raise_exception=False) - Args: - raise_exc: if exception occurs, raise it or not. + CMDHelperFuncs.mount_rw( + target=self.standby_slot_dev, + mount_point=self.standby_slot_mount_point, + ) - Return: - A bool indicates whether the mount succeeded or not. - """ - # first try umount the mount_point + def mount_active(self) -> None: + """Mount active rootfs ready-only.""" logger.debug("mount active slot rootfs dev...") - try: - CMDHelperFuncs.umount(self.active_slot_mount_point, ignore_error=True) - # mount active slot ro, unpropagated - CMDHelperFuncs.mount_ro( - target=self.active_slot_dev, - mount_point=self.active_slot_mount_point, - ) - return True - except Exception: - if raise_exc: - raise - return False + CMDHelperFuncs.mount_ro( + target=self.active_slot_dev, + mount_point=self.active_slot_mount_point, + ) def preserve_ota_folder_to_standby(self): """Copy the /boot/ota folder to standby slot to preserve it. @@ -866,10 +711,29 @@ def preserve_ota_folder_to_standby(self): except Exception as e: raise ValueError(f"failed to copy /boot/ota from active to standby: {e!r}") + def prepare_standby_dev( + self, + *, + erase_standby: bool = False, + fslabel: Optional[str] = None, + ) -> None: + CMDHelperFuncs.umount(self.standby_slot_dev, raise_exception=False) + if erase_standby: + return CMDHelperFuncs.mkfs_ext4(self.standby_slot_dev, fslabel=fslabel) + + # TODO: in the future if in-place update mode is implemented, do a + # fschck over the standby slot file system. + if fslabel: + CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) + def umount_all(self, *, ignore_error: bool = False): logger.debug("unmount standby slot and active slot mount point...") - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=ignore_error) - CMDHelperFuncs.umount(self.active_slot_mount_point, ignore_error=ignore_error) + CMDHelperFuncs.umount( + self.standby_slot_mount_point, raise_exception=ignore_error + ) + CMDHelperFuncs.umount( + self.active_slot_mount_point, raise_exception=ignore_error + ) def cat_proc_cmdline(target: str = "/proc/cmdline") -> str: diff --git a/otaclient/app/boot_control/_errors.py b/otaclient/app/boot_control/_errors.py deleted file mode 100644 index b2ba7bbd3..000000000 --- a/otaclient/app/boot_control/_errors.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -r"""Internal error types used by boot_control module.""" - -from enum import Enum, auto -from subprocess import CalledProcessError -from typing import Optional, Callable - - -class MountFailedReason(Enum): - # error code - SUCCESS = 0 - PERMISSIONS_ERROR = 1 - SYSTEM_ERROR = 2 - INTERNAL_ERROR = 4 - USER_INTERRUPT = 8 - GENERIC_MOUNT_FAILURE = 32 - - # custom error code - # specific reason for generic mount failure - TARGET_NOT_FOUND = auto() - TARGET_ALREADY_MOUNTED = auto() - MOUNT_POINT_NOT_FOUND = auto() - BIND_MOUNT_ON_NON_DIR = auto() - - @classmethod - def parse_failed_reason(cls, func: Callable): - from functools import wraps - - @wraps(func) - def _wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except CalledProcessError as e: - _called_process_error_str = ( - f"{e.cmd=} failed: {e.returncode=}, {e.stderr=}, {e.stdout=}" - ) - - _reason = cls(e.returncode) - if _reason != cls.GENERIC_MOUNT_FAILURE: - raise MountError( - _called_process_error_str, - fail_reason=_reason, - ) from None - - # if the return code is 32, determine the detailed reason - # of the mount failure - _error_msg, _fail_reason = str(e.stderr), cls.GENERIC_MOUNT_FAILURE - if _error_msg.find("already mounted") != -1: - _fail_reason = cls.TARGET_ALREADY_MOUNTED - elif _error_msg.find("mount point does not exist") != -1: - _fail_reason = cls.MOUNT_POINT_NOT_FOUND - elif _error_msg.find("does not exist") != -1: - _fail_reason = cls.TARGET_NOT_FOUND - elif _error_msg.find("Not a directory") != -1: - _fail_reason = cls.BIND_MOUNT_ON_NON_DIR - - raise MountError( - _called_process_error_str, - fail_reason=_fail_reason, - ) from None - - return _wrapper - - -class BootControlError(Exception): - """Internal used exception type related to boot control errors. - NOTE: this exception should not be directly raised to upper caller, - it should always be wrapped by a specific OTA Error type. - check app.errors for details. - """ - - -class MountError(BootControlError): - def __init__( - self, *args: object, fail_reason: Optional["MountFailedReason"] = None - ) -> None: - super().__init__(*args) - self.fail_reason = fail_reason - - -class ABPartitionError(BootControlError): - ... - - -class MkfsError(BootControlError): - ... diff --git a/otaclient/app/boot_control/_grub.py b/otaclient/app/boot_control/_grub.py index f4678f248..9cc4cdc89 100644 --- a/otaclient/app/boot_control/_grub.py +++ b/otaclient/app/boot_control/_grub.py @@ -31,6 +31,7 @@ """ +from __future__ import annotations import logging import re import shutil @@ -327,9 +328,16 @@ def _get_sibling_dev(self, active_dev: str) -> str: raise ValueError(_err_msg) # list children device file from parent device - cmd = f"-Pp -o NAME,FSTYPE {parent}" + cmd = ["lsblk", "-Ppo", "NAME,FSTYPE", parent] + try: + cmd_result = subprocess_check_output(cmd, raise_exception=True) + except Exception as e: + _err_msg = f"failed to detect boot device family tree: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + # exclude parent dev - output = CMDHelperFuncs._lsblk(cmd).splitlines()[1:] + output = cmd_result.splitlines()[1:] # FSTYPE="ext4" and # not (parent_device_file, root_device_file and boot_device_file) for blk in output: @@ -352,7 +360,14 @@ def _detect_active_slot(self) -> Tuple[str, str]: A tuple contains the slot_name and the full dev path of the active slot. """ - dev_path = CMDHelperFuncs.get_current_rootfs_dev() + try: + dev_path = CMDHelperFuncs.get_current_rootfs_dev() + assert dev_path + except Exception as e: + _err_msg = f"failed to detect current rootfs dev: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + _dev_path_ma = self.DEV_PATH_PA.match(dev_path) assert _dev_path_ma, f"dev path is invalid for OTA: {dev_path}" @@ -605,7 +620,17 @@ def _grub_update_on_booted_slot(self, *, abort_on_standby_missed=True): active_slot_grub_file = self.active_ota_partition_folder / cfg.GRUB_CFG_FNAME grub_cfg_content = GrubHelper.grub_mkconfig() - standby_uuid_str = CMDHelperFuncs.get_uuid_str_by_dev(self.standby_root_dev) + try: + standby_uuid = CMDHelperFuncs.get_attrs_by_dev( + "UUID", self.standby_root_dev + ) + assert standby_uuid + except Exception as e: + _err_msg = f"failed to get UUID of {self.standby_root_dev}: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + + standby_uuid_str = f"UUID={standby_uuid}" if grub_cfg_updated := GrubHelper.update_entry_rootfs( grub_cfg_content, kernel_ver=GrubHelper.SUFFIX_OTA_STANDBY, @@ -667,27 +692,6 @@ def _ensure_standby_slot_boot_files_symlinks(self, standby_slot: str): # API - def prepare_standby_dev(self, *, erase_standby: bool): - """ - Args: - erase_standby: indicate boot_controller whether to format the - standby slot's file system or not. This value is indicated and - passed to boot controller by the standby slot creator. - """ - try: - # try to unmount the standby root dev unconditionally - if CMDHelperFuncs.is_target_mounted(self.standby_root_dev): - CMDHelperFuncs.umount(self.standby_root_dev) - - if erase_standby: - CMDHelperFuncs.mkfs_ext4(self.standby_root_dev) - # TODO: check the standby file system status - # if not erase the standby slot - except Exception as e: - _err_msg = f"failed to prepare standby dev: {e!r}" - logger.error(_err_msg) - raise _GrubBootControllerError(_err_msg) from e - def finalize_update_switch_boot(self): """Finalize switch boot and use boot files from current booted slot.""" # NOTE: since we have not yet switched boot, the active/standby relationship is @@ -769,9 +773,19 @@ def _update_fstab(self, *, active_slot_fstab: Path, standby_slot_fstab: Path): Override existed entries in standby fstab, merge new entries from active fstab. """ - standby_uuid_str = CMDHelperFuncs.get_uuid_str_by_dev( - self._boot_control.standby_root_dev - ) + try: + standby_uuid = CMDHelperFuncs.get_attrs_by_dev( + "UUID", self._boot_control.standby_root_dev + ) + assert standby_uuid + except Exception as e: + _err_msg = ( + f"failed to get UUID of {self._boot_control.standby_root_dev}: {e!r}" + ) + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + + standby_uuid_str = f"UUID={standby_uuid}" fstab_entry_pa = re.compile( r"^\s*(?P[^# ]*)\s+" r"(?P[^ ]*)\s+" @@ -869,7 +883,7 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby=False) self._ota_status_control.pre_update_current() ### mount slots ### - self._boot_control.prepare_standby_dev(erase_standby=erase_standby) + self._mp_control.prepare_standby_dev(erase_standby=erase_standby) self._mp_control.mount_standby() self._mp_control.mount_active() diff --git a/otaclient/app/boot_control/_rpi_boot.py b/otaclient/app/boot_control/_rpi_boot.py index 46c4a2bdd..69c3e2abd 100644 --- a/otaclient/app/boot_control/_rpi_boot.py +++ b/otaclient/app/boot_control/_rpi_boot.py @@ -14,6 +14,7 @@ """Boot control support for Raspberry pi 4 Model B.""" +from __future__ import annotations import logging import os import re @@ -23,7 +24,7 @@ from .. import errors as ota_errors from ..proto import wrapper -from ..common import replace_atomic, subprocess_call +from ..common import replace_atomic, subprocess_call, subprocess_check_output from ._common import ( OTAStatusFilesControl, @@ -77,9 +78,8 @@ class _RPIBootControl: def __init__(self) -> None: self.system_boot_path = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) - if not ( - self.system_boot_path.is_dir() - and CMDHelperFuncs.is_target_mounted(self.system_boot_path) + if not CMDHelperFuncs.is_target_mounted( + self.system_boot_path, raise_exception=False ): _err_msg = "system-boot is not presented or not mounted!" logger.error(_err_msg) @@ -92,21 +92,22 @@ def _init_slots_info(self): logger.debug("checking and initializing slots info...") try: # detect active slot - self._active_slot_dev = CMDHelperFuncs.get_dev_by_mount_point( - cfg.ACTIVE_ROOTFS_PATH + _active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert _active_slot_dev + self._active_slot_dev = _active_slot_dev + + _active_slot = CMDHelperFuncs.get_attrs_by_dev( + "LABEL", str(self._active_slot_dev) ) - if not ( - _active_slot := CMDHelperFuncs.get_fslabel_by_dev(self._active_slot_dev) - ): - raise ValueError( - f"failed to get slot_id(fslabel) for active slot dev({self._active_slot_dev})" - ) + assert _active_slot self._active_slot = _active_slot # detect standby slot # NOTE: using the similar logic like grub, detect the silibing dev # of the active slot as standby slot - _parent = CMDHelperFuncs.get_parent_dev(self._active_slot_dev) + _parent = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) + assert _parent + # list children device file from parent device # exclude parent dev(always in the front) # expected raw result from lsblk: @@ -114,7 +115,11 @@ def _init_slots_info(self): # NAME="/dev/sdx1" # system-boot # NAME="/dev/sdx2" # slot_a # NAME="/dev/sdx3" # slot_b - _raw_child_partitions = CMDHelperFuncs._lsblk(f"-Pp -o NAME {_parent}") + _check_dev_family_cmd = ["lsblk", "-Ppo", "NAME", _parent] + _raw_child_partitions = subprocess_check_output( + _check_dev_family_cmd, raise_exception=True + ) + try: # NOTE: exclude the first 2 lines(parent and system-boot) _child_partitions = list( @@ -315,38 +320,12 @@ def finalize_switching_boot(self) -> bool: # reboot to the same slot to apply the new firmware logger.info("reboot to apply new firmware...") - CMDHelperFuncs.reboot() - - # NOTE(20230614): after calling reboot, otaclient SHOULD be terminated or exception raised - # on failed reboot command subprocess call. But if somehow it doesn't, - # it should be treated as failure and return False here. - return False - + CMDHelperFuncs.reboot() # this function will make otaclient exit immediately except Exception as e: _err_msg = f"failed to finalize boot switching: {e!r}" logger.error(_err_msg) return False - def prepare_standby_dev(self, *, erase_standby: bool): - # try umount and dev - if CMDHelperFuncs.is_target_mounted(self.standby_slot_dev): - CMDHelperFuncs.umount(self.standby_slot_dev) - try: - if erase_standby: - CMDHelperFuncs.mkfs_ext4( - self.standby_slot_dev, - fslabel=self.standby_slot, - ) - else: - # TODO: check the standby file system status - # if not erase the standby slot - # set the standby file system label with standby slot id - CMDHelperFuncs.set_dev_fslabel(self.active_slot_dev, self.standby_slot) - except Exception as e: - _err_msg = f"failed to prepare standby dev: {e!r}" - logger.error(_err_msg) - raise _RPIBootControllerError(_err_msg) from e - def prepare_tryboot_txt(self): """Copy the standby slot's config.txt as tryboot.txt.""" logger.debug("prepare tryboot.txt...") @@ -364,8 +343,7 @@ def reboot_tryboot(self): """Reboot with tryboot flag.""" logger.info(f"tryboot reboot to standby slot({self.standby_slot})...") try: - _cmd = "reboot '0 tryboot'" - subprocess_call(_cmd, raise_exception=True) + CMDHelperFuncs.reboot(args=["0 tryboot"]) except Exception as e: _err_msg = "failed to reboot" logger.exception(_err_msg) @@ -496,7 +474,10 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool) self._ota_status_control.pre_update_current() ### mount slots ### - self._rpiboot_control.prepare_standby_dev(erase_standby=erase_standby) + self._mp_control.prepare_standby_dev( + erase_standby=erase_standby, + fslabel=self._rpiboot_control.standby_slot, + ) self._mp_control.mount_standby() self._mp_control.mount_active() diff --git a/otaclient/app/boot_control/selecter.py b/otaclient/app/boot_control/selecter.py index 369e2835f..93ee5190a 100644 --- a/otaclient/app/boot_control/selecter.py +++ b/otaclient/app/boot_control/selecter.py @@ -20,7 +20,6 @@ from typing_extensions import deprecated from .configs import BootloaderType -from ._errors import BootControlError from .protocol import BootControllerProtocol from ..common import read_str_from_file @@ -82,13 +81,9 @@ def get_boot_controller( from ._grub import GrubController return GrubController - if bootloader_type == BootloaderType.CBOOT: - from ._cboot import CBootController - - return CBootController if bootloader_type == BootloaderType.RPI_BOOT: from ._rpi_boot import RPIBootController return RPIBootController - raise BootControlError from NotImplementedError(f"unsupported: {bootloader_type=}") + raise NotImplementedError(f"unsupported: {bootloader_type=}") diff --git a/otaclient/app/common.py b/otaclient/app/common.py index 4355f48c2..536cbcb7f 100644 --- a/otaclient/app/common.py +++ b/otaclient/app/common.py @@ -118,45 +118,99 @@ def write_str_to_file_sync(path: Union[Path, str], input: str): os.fsync(f.fileno()) -# wrapped subprocess call -def subprocess_call(cmd: str, *, raise_exception=False): +def subprocess_run_wrapper( + cmd: str | list[str], + *, + check: bool, + check_output: bool, + timeout: Optional[float] = None, +) -> subprocess.CompletedProcess[bytes]: + """A wrapper for subprocess.run method. + + NOTE: this is for the requirement of customized subprocess call + in the future, like chroot or nsenter before execution. + + Args: + cmd (str | list[str]): command to be executed. + check (bool): if True, raise CalledProcessError on non 0 return code. + check_output (bool): if True, the UTF-8 decoded stdout will be returned. + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + subprocess.CompletedProcess[bytes]: the result of the execution. """ + if isinstance(cmd, str): + cmd = shlex.split(cmd) + + return subprocess.run( + cmd, + check=check, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE if check_output else None, + timeout=timeout, + ) - Raises: - a ValueError containing information about the failure. + +def subprocess_check_output( + cmd: str | list[str], + *, + raise_exception: bool = False, + default: str = "", + timeout: Optional[float] = None, +) -> str: + """Run the and return UTF-8 decoded stripped stdout. + + Args: + cmd (str | list[str]): command to be executed. + raise_exception (bool, optional): raise the underlying CalledProcessError. Defaults to False. + default (str, optional): if is False, return on underlying + subprocess call failed. Defaults to "". + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + str: UTF-8 decoded stripped stdout. """ try: - # NOTE: we need to check the stderr and stdout when error occurs, - # so use subprocess.run here instead of subprocess.check_call - subprocess.run( - shlex.split(cmd), - check=True, - capture_output=True, + res = subprocess_run_wrapper( + cmd, check=True, check_output=True, timeout=timeout ) + return res.stdout.decode().strip() except subprocess.CalledProcessError as e: - msg = f"command({cmd=}) failed({e.returncode=}, {e.stderr=}, {e.stdout=})" - logger.debug(msg) + _err_msg = ( + f"command({cmd=}) failed(retcode={e.returncode}: \n" + f"stderr={e.stderr.decode()}" + ) + logger.debug(_err_msg) + if raise_exception: raise + return default -def subprocess_check_output(cmd: str, *, raise_exception=False, default="") -> str: - """ - Raises: - a ValueError containing information about the failure. +def subprocess_call( + cmd: str | list[str], + *, + raise_exception: bool = False, + timeout: Optional[float] = None, +) -> None: + """Run the . + + Args: + cmd (str | list[str]): command to be executed. + raise_exception (bool, optional): raise the underlying CalledProcessError. Defaults to False. + timeout (Optional[float], optional): timeout for execution. Defaults to None. """ try: - return ( - subprocess.check_output(shlex.split(cmd), stderr=subprocess.PIPE) - .decode() - .strip() - ) + subprocess_run_wrapper(cmd, check=True, check_output=False, timeout=timeout) except subprocess.CalledProcessError as e: - msg = f"command({cmd=}) failed({e.returncode=}, {e.stderr=}, {e.stdout=})" - logger.debug(msg) + _err_msg = ( + f"command({cmd=}) failed(retcode={e.returncode}: \n" + f"stderr={e.stderr.decode()}" + ) + logger.debug(_err_msg) + if raise_exception: raise - return default def copy_stat(src: Union[Path, str], dst: Union[Path, str]): @@ -169,6 +223,8 @@ def copy_stat(src: Union[Path, str], dst: Union[Path, str]): def copytree_identical(src: Path, dst: Path): """Recursively copy from the src folder to dst folder. + Source folder MUST be a dir. + This function populate files/dirs from the src to the dst, and make sure the dst is identical to the src. @@ -189,8 +245,13 @@ def copytree_identical(src: Path, dst: Path): NOTE: is_file/is_dir also returns True if it is a symlink and the link target is_file/is_dir """ + if src.is_symlink() or not src.is_dir(): + raise ValueError(f"{src} is not a dir") + if dst.is_symlink() or not dst.is_dir(): - raise FileNotFoundError(f"{dst} is not found or not a dir") + logger.info(f"{dst=} doesn't exist or not a dir, cleanup and mkdir") + dst.unlink(missing_ok=True) # unlink doesn't follow the symlink + dst.mkdir(mode=src.stat().st_mode, parents=True) # phase1: populate files to the dst for cur_dir, dirs, files in os.walk(src, topdown=True, followlinks=False): @@ -502,7 +563,7 @@ def shutdown(self, *, raise_last_exc: bool): logger.debug("shutdown retry task map") if self._running_inst: self._running_inst.shutdown(raise_last_exc=raise_last_exc) - # NOTE: passthrough the exception from underlaying running_inst + # NOTE: passthrough the exception from underlying running_inst finally: self._running_inst = None self._executor.shutdown(wait=True) diff --git a/otaclient/app/ota_client_stub.py b/otaclient/app/ota_client_stub.py index 2435b6c3a..d912077c5 100644 --- a/otaclient/app/ota_client_stub.py +++ b/otaclient/app/ota_client_stub.py @@ -98,20 +98,27 @@ def _subprocess_init(self): def _mount_external_cache_storage(self): # detect cache_dev on every startup - _cache_dev = CMDHelperFuncs._findfs("LABEL", self._external_cache_dev_fslabel) + _cache_dev = CMDHelperFuncs.get_dev_by_token( + "LABEL", + self._external_cache_dev_fslabel, + raise_exception=False, + ) if not _cache_dev: return + if len(_cache_dev) > 1: + logger.warning( + f"multiple external cache storage device found, use the first one: {_cache_dev[0]}" + ) + _cache_dev = _cache_dev[0] + self.logger.info(f"external cache dev detected at {_cache_dev}") self._external_cache_dev = _cache_dev # try to unmount the mount_point and cache_dev unconditionally _mp = Path(self._external_cache_dev_mp) - CMDHelperFuncs.umount(_cache_dev, ignore_error=True) - if _mp.is_dir(): - CMDHelperFuncs.umount(self._external_cache_dev_mp, ignore_error=True) - else: - _mp.mkdir(parents=True, exist_ok=True) + CMDHelperFuncs.umount(_cache_dev, raise_exception=False) + _mp.mkdir(parents=True, exist_ok=True) # try to mount cache_dev ro try: diff --git a/otaclient/app/proto/README.md b/otaclient/app/proto/README.md index b1a177efa..4aab4ca71 100644 --- a/otaclient/app/proto/README.md +++ b/otaclient/app/proto/README.md @@ -143,7 +143,7 @@ After the compiled python code is ready, we can define the wrappers accordingly # type hints for each attributes when manually create instances # of wrapper types. # NOTE: this __init__ is just for typing use, it will not be - # used by the underlaying MessageWrapper + # used by the underlying MessageWrapper # NOTE: if pyi file is also generated when compiling the proto file, # the __init__ in pyi file can be used directly with little adjustment def __init__( diff --git a/tests/test_boot_control/test_cboot.py b/tests/test_boot_control/test_cboot.py deleted file mode 100644 index f0d7d8bc3..000000000 --- a/tests/test_boot_control/test_cboot.py +++ /dev/null @@ -1,308 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -r""" -CBOOT switch boot mechanism follow(normal successful case): -* Assume we are at slot-0, and apply an OTA update - -condition before OTA update: - current slot: 0, ota_status=SUCCESS, slot_in_use=0 - standby slot: 1 - -pre-update: - 1. store current ota_status(=FAILURE) - 2. store current slot_in_use(=1) - 3. set standby_slot unbootable - 4. prepare and mount standby(params: standby_rootfs_dev, erase_standby) - 5. mount refroot(params: standby_rootfs_dev, current_rootfs_dev, standby_as_ref) - 6. store standby ota_status(=UPDATING) - 7. store standby slot_in_use(=1), standby_version -post-update: - 1. update extlinux_cfg file - 2. (if external_rootfs_enabled) populate boot folder to bootdev - 3. umount all - 4. switch boot -first-reboot -init boot controller - 1. makr current slot boot successful - -condition after OTA update: - current slot: 1, ota_status=SUCCESS, slot_in_use=1 - standby slot: 0 -""" -from functools import partial -from pathlib import Path -import shutil -import typing -import pytest -import pytest_mock - -import logging - -from tests.utils import SlotMeta, compare_dir -from tests.conftest import TestConfiguration as cfg - -logger = logging.getLogger(__name__) - - -class CbootFSM: - def __init__(self) -> None: - self.current_slot = cfg.SLOT_A_ID_CBOOT - self.standby_slot = cfg.SLOT_B_ID_CBOOT - self.current_slot_bootable = True - self.standby_slot_bootable = True - - self.is_boot_switched = False - - def get_current_slot(self): - return self.current_slot - - def get_standby_slot(self): - return self.standby_slot - - def get_standby_partuuid_str(self): - if self.standby_slot == cfg.SLOT_B_ID_CBOOT: - return f"PARTUUID={cfg.SLOT_B_PARTUUID}" - else: - return f"PARTUUID={cfg.SLOT_A_PARTUUID}" - - def is_current_slot_bootable(self): - return self.current_slot_bootable - - def is_standby_slot_bootable(self): - return self.standby_slot_bootable - - def mark_current_slot_as(self, bootable: bool): - self.current_slot_bootable = bootable - - def mark_standby_slot_as(self, bootable: bool): - self.standby_slot_bootable = bootable - - def switch_boot(self): - self.current_slot, self.standby_slot = self.standby_slot, self.current_slot - self.current_slot_bootable, self.standby_slot_bootable = ( - self.standby_slot_bootable, - self.current_slot_bootable, - ) - self.is_boot_switched = True - - -class TestCBootControl: - EXTLNUX_CFG_SLOT_A = Path(__file__).parent / "extlinux.conf_slot_a" - EXTLNUX_CFG_SLOT_B = Path(__file__).parent / "extlinux.conf_slot_b" - - def cfg_for_slot_a_as_current(self): - """ - NOTE: we always only refer to ota-status dir at the rootfs! - """ - from otaclient.app.boot_control.configs import CBootControlConfig - - _mocked_cboot_cfg = CBootControlConfig() - _mocked_cboot_cfg.MOUNT_POINT = str(self.slot_b) # type: ignore - _mocked_cboot_cfg.ACTIVE_ROOT_MOUNT_POINT = str(self.slot_a) # type: ignore - # NOTE: SEPARATE_BOOT_MOUNT_POINT is the root of the boot device! - _mocked_cboot_cfg.SEPARATE_BOOT_MOUNT_POINT = str(self.slot_b_boot_dev) - _mocked_cboot_cfg.ACTIVE_ROOTFS_PATH = str(self.slot_a) # type: ignore - return _mocked_cboot_cfg - - def cfg_for_slot_b_as_current(self): - from otaclient.app.boot_control.configs import CBootControlConfig - - _mocked_cboot_cfg = CBootControlConfig() - _mocked_cboot_cfg.MOUNT_POINT = str(self.slot_a) # type: ignore - _mocked_cboot_cfg.ACTIVE_ROOT_MOUNT_POINT = str(self.slot_b) # type: ignore - # NOTE: SEPARATE_BOOT_MOUNT_POINT is the root of the boot device! - _mocked_cboot_cfg.SEPARATE_BOOT_MOUNT_POINT = str(self.slot_a_boot_dev) - _mocked_cboot_cfg.ACTIVE_ROOTFS_PATH = str(self.slot_b) # type: ignore - return _mocked_cboot_cfg - - @pytest.fixture - def cboot_ab_slot(self, ab_slots: SlotMeta): - """ - TODO: not considering rootfs on internal storage now - boot folder structure for cboot: - boot_dir_{slot_a, slot_b}/ - ota-status/ - status - version - slot_in_use - """ - self.slot_a = Path(ab_slots.slot_a) - self.slot_b = Path(ab_slots.slot_b) - self.slot_a_boot_dev = Path(ab_slots.slot_a_boot_dev) - self.slot_b_boot_dev = Path(ab_slots.slot_b_boot_dev) - self.slot_a_uuid = cfg.SLOT_A_PARTUUID - self.slot_b_uuid = cfg.SLOT_B_PARTUUID - - # prepare ota_status dir for slot_a - self.slot_a_ota_status_dir = self.slot_a / Path(cfg.OTA_STATUS_DIR).relative_to( - "/" - ) - self.slot_a_ota_status_dir.mkdir(parents=True) - slot_a_ota_status = self.slot_a_ota_status_dir / "status" - slot_a_ota_status.write_text("SUCCESS") - slot_a_version = self.slot_a_ota_status_dir / "version" - slot_a_version.write_text(cfg.CURRENT_VERSION) - slot_a_slot_in_use = self.slot_a_ota_status_dir / "slot_in_use" - slot_a_slot_in_use.write_text(cfg.SLOT_A_ID_CBOOT) - # also prepare a copy of boot folder to rootfs - shutil.copytree( - self.slot_a_boot_dev / Path(cfg.BOOT_DIR).relative_to("/"), - self.slot_a / Path(cfg.BOOT_DIR).relative_to("/"), - dirs_exist_ok=True, - ) - - # prepare extlinux file - extlinux_dir = self.slot_a / "boot/extlinux" - extlinux_dir.mkdir() - extlinux_cfg = extlinux_dir / "extlinux.conf" - extlinux_cfg.write_text(self.EXTLNUX_CFG_SLOT_A.read_text()) - - # ota_status dir for slot_b(separate boot dev) - self.slot_b_ota_status_dir = self.slot_b / Path(cfg.OTA_STATUS_DIR).relative_to( - "/" - ) - - @pytest.fixture - def mock_setup( - self, - mocker: pytest_mock.MockerFixture, - cboot_ab_slot, - ): - from otaclient.app.boot_control._cboot import _CBootControl - from otaclient.app.boot_control._common import CMDHelperFuncs - - ###### start fsm ###### - self._fsm = CbootFSM() - - ###### mocking _CBootControl ###### - _CBootControl_mock = typing.cast( - _CBootControl, mocker.MagicMock(spec=_CBootControl) - ) - # mock methods - _CBootControl_mock.get_current_slot = mocker.MagicMock( - wraps=self._fsm.get_current_slot - ) - _CBootControl_mock.get_standby_slot = mocker.MagicMock( - wraps=self._fsm.get_standby_slot - ) - _CBootControl_mock.get_standby_rootfs_partuuid_str = mocker.MagicMock( - wraps=self._fsm.get_standby_partuuid_str - ) - _CBootControl_mock.mark_current_slot_boot_successful.side_effect = partial( - self._fsm.mark_current_slot_as, True - ) - _CBootControl_mock.set_standby_slot_unbootable.side_effect = partial( - self._fsm.mark_standby_slot_as, False - ) - _CBootControl_mock.switch_boot.side_effect = self._fsm.switch_boot - _CBootControl_mock.is_current_slot_marked_successful = mocker.MagicMock( - wraps=self._fsm.is_current_slot_bootable - ) - # NOTE: we only test external rootfs - _CBootControl_mock.is_external_rootfs_enabled.return_value = True - # make update_extlinux_cfg as it - _CBootControl_mock.update_extlinux_cfg = mocker.MagicMock( - wraps=_CBootControl.update_extlinux_cfg - ) - - ###### mocking _CMDHelper ###### - _CMDHelper_mock = typing.cast( - CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) - ) - - ###### patching ###### - # patch _CBootControl - _CBootControl_path = f"{cfg.CBOOT_MODULE_PATH}._CBootControl" - mocker.patch(_CBootControl_path, return_value=_CBootControl_mock) - # patch CMDHelperFuncs - # NOTE: also remember to patch CMDHelperFuncs in common - mocker.patch(f"{cfg.CBOOT_MODULE_PATH}.CMDHelperFuncs", _CMDHelper_mock) - mocker.patch( - f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs", _CMDHelper_mock - ) - mocker.patch(f"{cfg.CBOOT_MODULE_PATH}.Nvbootctrl") - - ###### binding mocked object to test instance ###### - self._CBootControl_mock = _CBootControl_mock - self._CMDHelper_mock = _CMDHelper_mock - - def test_cboot_normal_update(self, mocker: pytest_mock.MockerFixture, mock_setup): - from otaclient.app.boot_control._cboot import CBootController - - _cfg_patch_path = f"{cfg.CBOOT_MODULE_PATH}.cfg" - _relative_ota_status_path = Path(cfg.OTA_STATUS_DIR).relative_to("/") - - ###### stage 1 ###### - mocker.patch(_cfg_patch_path, self.cfg_for_slot_a_as_current()) - logger.info("init cboot controller...") - cboot_controller = CBootController() - assert ( - self.slot_a / _relative_ota_status_path / "status" - ).read_text() == "SUCCESS" - - # test pre-update - cboot_controller.pre_update( - version=cfg.UPDATE_VERSION, - standby_as_ref=False, # NOTE: not used - erase_standby=False, # NOTE: not used - ) - # assert current slot ota-status - assert ( - self.slot_a / _relative_ota_status_path / "status" - ).read_text() == "FAILURE" - assert ( - self.slot_a / "boot/ota-status/slot_in_use" - ).read_text() == self._fsm.get_standby_slot() - # assert standby slot ota-status - assert ( - self.slot_b / _relative_ota_status_path / "status" - ).read_text() == "UPDATING" - assert ( - self.slot_b / _relative_ota_status_path / "version" - ).read_text() == cfg.UPDATE_VERSION - assert ( - self.slot_b / _relative_ota_status_path / "slot_in_use" - ).read_text() == self._fsm.get_standby_slot() - - logger.info("pre-update completed, entering post-update...") - # NOTE: standby slot's extlinux file is not yet populated(done by create_standby) - # prepare it by ourself - # NOTE 2: populate to standby rootfs' boot folder - standby_extlinux_dir = self.slot_b / "boot/extlinux" - standby_extlinux_dir.mkdir() - standby_extlinux_file = standby_extlinux_dir / "extlinux.conf" - standby_extlinux_file.write_text(self.EXTLNUX_CFG_SLOT_A.read_text()) - - # test post-update - _post_updater = cboot_controller.post_update() - next(_post_updater) - next(_post_updater, None) - assert ( - self.slot_b_boot_dev / "boot/extlinux/extlinux.conf" - ).read_text() == self.EXTLNUX_CFG_SLOT_B.read_text() - self._CBootControl_mock.switch_boot.assert_called_once() - self._CMDHelper_mock.reboot.assert_called_once() - # assert separate bootdev is populated correctly - compare_dir(self.slot_b / "boot", self.slot_b_boot_dev / "boot") - - ###### stage 2 ###### - logger.info("post-update completed, test init after first reboot...") - mocker.patch(_cfg_patch_path, self.cfg_for_slot_b_as_current()) - cboot_controller = CBootController() - assert ( - self.slot_b / _relative_ota_status_path / "status" - ).read_text() == "SUCCESS" - assert self._fsm.is_boot_switched diff --git a/tests/test_boot_control/test_grub.py b/tests/test_boot_control/test_grub.py index 180a5d6a1..0c70ce1ea 100644 --- a/tests/test_boot_control/test_grub.py +++ b/tests/test_boot_control/test_grub.py @@ -35,8 +35,8 @@ def __init__(self, slot_a_mp, slot_b_mp) -> None: self._standby_slot = cfg.SLOT_B_ID_GRUB self._current_slot_mp = Path(slot_a_mp) self._standby_slot_mp = Path(slot_b_mp) - self._current_slot_dev_uuid = f"UUID={cfg.SLOT_A_UUID}" - self._standby_slot_dev_uuid = f"UUID={cfg.SLOT_B_UUID}" + self._current_slot_dev_uuid = cfg.SLOT_A_UUID + self._standby_slot_dev_uuid = cfg.SLOT_B_UUID self.current_slot_bootable = True self.standby_slot_bootable = True @@ -63,7 +63,7 @@ def get_standby_slot_mp(self) -> Path: def get_standby_boot_dir(self) -> Path: return self._standby_slot_mp / "boot" - def get_uuid_str_by_dev(self, dev: str): + def get_attrs_by_dev(self, attr: str, dev: str) -> str: if dev == self.get_standby_slot_dev(): return self._standby_slot_dev_uuid else: @@ -271,8 +271,8 @@ def mock_setup( CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) ) _CMDHelper_mock.reboot.side_effect = self._fsm.switch_boot - _CMDHelper_mock.get_uuid_str_by_dev = mocker.MagicMock( - wraps=self._fsm.get_uuid_str_by_dev + _CMDHelper_mock.get_attrs_by_dev = mocker.MagicMock( + wraps=self._fsm.get_attrs_by_dev ) # bind the mocker to the test instance self._CMDHelper_mock = _CMDHelper_mock diff --git a/tests/test_common.py b/tests/test_common.py index 534e311da..850a865a8 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -19,7 +19,6 @@ import pytest import random import logging -from concurrent.futures import ThreadPoolExecutor from functools import partial from hashlib import sha256 from multiprocessing import Process @@ -38,11 +37,10 @@ subprocess_call, subprocess_check_output, verify_file, - wait_with_backoff, write_str_to_file_sync, ) from tests.utils import compare_dir -from tests.conftest import cfg, run_http_server +from tests.conftest import run_http_server logger = logging.getLogger(__name__) @@ -97,39 +95,6 @@ def test_write_to_file_sync(tmp_path: Path): assert _path.read_text() == _TEST_FILE_CONTENT -def test_subprocess_call(): - with pytest.raises(subprocess.CalledProcessError) as e: - subprocess_call("ls /non-existed", raise_exception=True) - _origin_e = e.value - assert _origin_e.returncode == 2 - assert ( - _origin_e.stderr.decode().strip() - == "ls: cannot access '/non-existed': No such file or directory" - ) - - subprocess_call("ls /non-existed", raise_exception=False) - - -def test_subprocess_check_output(file_t: Tuple[str, str, int]): - _path, _, _ = file_t - with pytest.raises(subprocess.CalledProcessError) as e: - subprocess_check_output("cat /non-existed", raise_exception=True) - _origin_e = e.value - assert _origin_e.returncode == 1 - assert ( - _origin_e.stderr.decode().strip() - == "cat: /non-existed: No such file or directory" - ) - - assert ( - subprocess_check_output( - "cat /non-existed", raise_exception=False, default="abc" - ) - == "abc" - ) - assert subprocess_check_output(f"cat {_path}") == _TEST_FILE_CONTENT - - class Test_copytree_identical: @pytest.fixture(autouse=True) def setup(self, tmp_path: Path): @@ -266,7 +231,7 @@ def test_symlink_to_directory(self, tmp_path: Path): class _RetryTaskMapTestErr(Exception): - ... + """""" class TestRetryTaskMap: @@ -445,3 +410,58 @@ def test_probing_delayed_online_server(self, subprocess_launch_server): ) # probing should cost at least seconds assert int(time.time()) >= start_time + self.LAUNCH_DELAY + + +class TestSubprocessCall: + TEST_FILE_CONTENTS = "test file contents" + + @pytest.fixture(autouse=True) + def setup_test(self, tmp_path: Path): + test_file = tmp_path / "test_file" + test_file.write_text(self.TEST_FILE_CONTENTS) + self.existed_file = test_file + self.non_existed_file = tmp_path / "non_existed_file" + + def test_subprocess_call_failed(self): + cmd = ["ls", str(self.non_existed_file)] + with pytest.raises(subprocess.CalledProcessError) as e: + subprocess_call(cmd, raise_exception=True) + origin_exc: subprocess.CalledProcessError = e.value + + assert origin_exc.returncode == 2 + assert ( + origin_exc.stderr.decode().strip() + == f"ls: cannot access '{self.non_existed_file}': No such file or directory" + ) + + # test exception supressed + subprocess_call(cmd, raise_exception=False) + + def test_subprocess_call_succeeded(self): + cmd = ["ls", str(self.existed_file)] + subprocess_call(cmd, raise_exception=True) + + def test_subprocess_check_output_failed(self): + cmd = ["cat", str(self.non_existed_file)] + + with pytest.raises(subprocess.CalledProcessError) as e: + subprocess_check_output(cmd, raise_exception=True) + origin_exc: subprocess.CalledProcessError = e.value + assert origin_exc.returncode == 1 + assert ( + origin_exc.stderr.decode().strip() + == f"cat: {self.non_existed_file}: No such file or directory" + ) + + # test exception supressed + default_value = "test default_value" + output = subprocess_check_output( + cmd, default=default_value, raise_exception=False + ) + assert output == default_value + + def test_subprocess_check_output_succeeded(self): + cmd = ["cat", str(self.existed_file)] + + output = subprocess_check_output(cmd, raise_exception=True) + assert output == self.TEST_FILE_CONTENTS