diff --git a/otaclient/app/boot_control/_cboot.py b/otaclient/app/boot_control/_cboot.py deleted file mode 100644 index 38a3634a0..000000000 --- a/otaclient/app/boot_control/_cboot.py +++ /dev/null @@ -1,579 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import os -import re -from pathlib import Path -from functools import partial -from subprocess import CalledProcessError -from typing import Generator, Optional - - -from .. import errors as ota_errors -from ..common import ( - copytree_identical, - read_str_from_file, - subprocess_call, - subprocess_check_output, - write_str_to_file_sync, -) - -from ..proto import wrapper - -from ._common import ( - OTAStatusMixin, - PrepareMountMixin, - CMDHelperFuncs, - SlotInUseMixin, - VersionControlMixin, -) -from .configs import cboot_cfg as cfg -from .protocol import BootControllerProtocol -from .firmware import Firmware - - -logger = logging.getLogger(__name__) - - -class NvbootctrlError(Exception): - """Specific internal errors related to nvbootctrl cmd.""" - - -class Nvbootctrl: - """ - NOTE: slot and rootfs are binding accordingly! - partid mapping: p1->slot0, p2->slot1 - - slot num: 0->A, 1->B - """ - - EMMC_DEV: str = "mmcblk0" - NVME_DEV: str = "nvme0n1" - # slot0<->slot1 - CURRENT_STANDBY_FLIP = {"0": "1", "1": "0"} - # p1->slot0, p2->slot1 - PARTID_SLOTID_MAP = {"1": "0", "2": "1"} - # slot0->p1, slot1->p2 - SLOTID_PARTID_MAP = {v: k for k, v in PARTID_SLOTID_MAP.items()} - - # nvbootctrl - @staticmethod - def _nvbootctrl( - arg: str, *, target="rootfs", call_only=True, raise_exception=True - ) -> Optional[str]: - """ - Raises: - NvbootCtrlError if raise_exception is True. - """ - _cmd = f"nvbootctrl -t {target} {arg}" - try: - if call_only: - subprocess_call(_cmd, raise_exception=raise_exception) - return - else: - return subprocess_check_output(_cmd, raise_exception=raise_exception) - except CalledProcessError as e: - raise NvbootctrlError from e - - @classmethod - def check_rootdev(cls, dev: str) -> bool: - """ - check whether the givin dev is legal root dev or not - - NOTE: expect using UUID method to assign rootfs! - """ - pa = re.compile(r"\broot=(?P[\w=-]*)\b") - try: - if ma := pa.search(subprocess_check_output("cat /proc/cmdline")): - res = ma.group("rdev") - else: - raise ValueError(f"failed to find specific {dev} in cmdline") - except (CalledProcessError, ValueError) as e: - raise NvbootctrlError( - "rootfs detect failed or rootfs is not specified by PARTUUID in kernel cmdline" - ) from e - - uuid = res.split("=")[-1] - - return Path(CMDHelperFuncs.get_dev_by_partuuid(uuid)).resolve( - strict=True - ) == Path(dev).resolve(strict=True) - - @classmethod - def get_current_slot(cls) -> str: - if slot := cls._nvbootctrl("get-current-slot", call_only=False): - return slot - else: - raise NvbootctrlError - - @classmethod - def dump_slots_info(cls) -> Optional[str]: - return cls._nvbootctrl( - "dump-slots-info", call_only=False, raise_exception=False - ) - - @classmethod - def mark_boot_successful(cls): - """Mark current slot as boot successfully.""" - cls._nvbootctrl("mark-boot-successful") - - @classmethod - def set_active_boot_slot(cls, slot: str, target="rootfs"): - cls._nvbootctrl(f"set-active-boot-slot {slot}", target=target) - - @classmethod - def set_slot_as_unbootable(cls, slot: str): - cls._nvbootctrl(f"set-slot-as-unbootable {slot}") - - @classmethod - def is_slot_bootable(cls, slot: str) -> bool: - try: - cls._nvbootctrl(f"is-slot-bootable {slot}") - return True - except NvbootctrlError: - return False - - @classmethod - def is_slot_marked_successful(cls, slot: str) -> bool: - try: - cls._nvbootctrl(f"is-slot-marked-successful {slot}") - return True - except NvbootctrlError: - return False - - -class _CBootControl: - def __init__(self): - # NOTE: only support rqx-580, rqx-58g platform right now! - # detect the chip id - self.chip_id = read_str_from_file(cfg.TEGRA_CHIP_ID_PATH) - if not self.chip_id or int(self.chip_id) not in cfg.CHIP_ID_MODEL_MAP: - raise NotImplementedError( - f"unsupported platform found (chip_id: {self.chip_id}), abort" - ) - - self.chip_id = int(self.chip_id) - self.model = cfg.CHIP_ID_MODEL_MAP[self.chip_id] - logger.info(f"{self.model=}, (chip_id={hex(self.chip_id)})") - - # initializing dev info - self._init_dev_info() - logger.info(f"finished cboot control init: {Nvbootctrl.dump_slots_info()=}") - - def _init_dev_info(self): - self.current_slot: str = Nvbootctrl.get_current_slot() - self.current_rootfs_dev: str = CMDHelperFuncs.get_current_rootfs_dev() - # NOTE: boot dev is always emmc device now - self.current_boot_dev: str = f"/dev/{Nvbootctrl.EMMC_DEV}p{Nvbootctrl.SLOTID_PARTID_MAP[self.current_slot]}" - - self.standby_slot: str = Nvbootctrl.CURRENT_STANDBY_FLIP[self.current_slot] - standby_partid = Nvbootctrl.SLOTID_PARTID_MAP[self.standby_slot] - self.standby_boot_dev: str = f"/dev/{Nvbootctrl.EMMC_DEV}p{standby_partid}" - - # detect rootfs position - if self.current_rootfs_dev.find(Nvbootctrl.NVME_DEV) != -1: - logger.debug("rootfs on external storage detected, nvme rootfs is enable") - self.is_rootfs_on_external = True - self.standby_rootfs_dev = f"/dev/{Nvbootctrl.NVME_DEV}p{standby_partid}" - self.standby_slot_partuuid_str = CMDHelperFuncs.get_partuuid_str_by_dev( - self.standby_rootfs_dev - ) - elif self.current_rootfs_dev.find(Nvbootctrl.EMMC_DEV) != -1: - logger.debug("using internal storage as rootfs") - self.is_rootfs_on_external = False - self.standby_rootfs_dev = f"/dev/{Nvbootctrl.EMMC_DEV}p{standby_partid}" - self.standby_slot_partuuid_str = CMDHelperFuncs.get_partuuid_str_by_dev( - self.standby_rootfs_dev - ) - else: - raise NotImplementedError( - f"rootfs on {self.current_rootfs_dev} is not supported, abort" - ) - - # ensure rootfs is as expected - if not Nvbootctrl.check_rootdev(self.current_rootfs_dev): - msg = f"rootfs mismatch, expect {self.current_rootfs_dev} as rootfs" - raise NvbootctrlError(msg) - elif Nvbootctrl.check_rootdev(self.standby_rootfs_dev): - msg = ( - f"rootfs mismatch, expect {self.standby_rootfs_dev} as standby slot dev" - ) - raise NvbootctrlError(msg) - - logger.info("dev info initializing completed") - logger.info( - f"{self.current_slot=}, {self.current_boot_dev=}, {self.current_rootfs_dev=}" - ) - logger.info( - f"{self.standby_slot=}, {self.standby_boot_dev=}, {self.standby_rootfs_dev=}" - ) - - ###### CBootControl API ###### - def get_current_slot(self) -> str: - return self.current_slot - - def get_current_rootfs_dev(self) -> str: - return self.current_rootfs_dev - - def get_standby_rootfs_dev(self) -> str: - return self.standby_rootfs_dev - - def get_standby_slot(self) -> str: - return self.standby_slot - - def get_standby_rootfs_partuuid_str(self) -> str: - return self.standby_slot_partuuid_str - - def get_standby_boot_dev(self) -> str: - return self.standby_boot_dev - - def is_external_rootfs_enabled(self) -> bool: - return self.is_rootfs_on_external - - def mark_current_slot_boot_successful(self): - logger.info(f"mark {self.current_slot=} as boot successful") - Nvbootctrl.mark_boot_successful() - - def set_standby_slot_unbootable(self): - slot = self.standby_slot - Nvbootctrl.set_slot_as_unbootable(slot) - - def switch_boot(self): - slot = self.standby_slot - - logger.info(f"switch boot to {slot=}") - Nvbootctrl.set_active_boot_slot(slot, target="bootloader") - Nvbootctrl.set_active_boot_slot(slot) - - def is_current_slot_marked_successful(self) -> bool: - slot = self.current_slot - return Nvbootctrl.is_slot_marked_successful(slot) - - @staticmethod - def update_extlinux_cfg(dst: Path, ref: Path, partuuid_str: str): - """Write dst extlinux.conf based on reference extlinux.conf and partuuid_str. - - Params: - dst: path to dst extlinux.conf file - ref: reference extlinux.conf file - partuuid_str: rootfs specification string like "PARTUUID=" - """ - - def _replace(ma: re.Match, repl: str): - append_l: str = ma.group(0) - if append_l.startswith("#"): - return append_l - res, n = re.compile(r"root=[\w\-=]*").subn(repl, append_l) - if not n: - res = f"{append_l} {repl}" - - return res - - _repl_func = partial(_replace, repl=f"root={partuuid_str}") - write_str_to_file_sync( - dst, re.compile(r"\n\s*APPEND.*").sub(_repl_func, ref.read_text()) - ) - - -class CBootController( - PrepareMountMixin, - SlotInUseMixin, - OTAStatusMixin, - VersionControlMixin, - BootControllerProtocol, -): - def __init__(self) -> None: - try: - self._cboot_control: _CBootControl = _CBootControl() - - # load paths - ## first try to unmount standby dev if possible - self.standby_slot_dev = self._cboot_control.get_standby_rootfs_dev() - CMDHelperFuncs.umount(self.standby_slot_dev) - - self.standby_slot_mount_point = Path(cfg.MOUNT_POINT) - self.standby_slot_mount_point.mkdir(exist_ok=True) - - ## refroot mount point - _refroot_mount_point = cfg.ACTIVE_ROOT_MOUNT_POINT - # first try to umount refroot mount point - CMDHelperFuncs.umount(_refroot_mount_point) - if not os.path.isdir(_refroot_mount_point): - os.mkdir(_refroot_mount_point) - self.ref_slot_mount_point = Path(_refroot_mount_point) - - ## ota-status dir - ### current slot - self.current_ota_status_dir = Path(cfg.ACTIVE_ROOTFS_PATH) / Path( - cfg.OTA_STATUS_DIR - ).relative_to("/") - self.current_ota_status_dir.mkdir(parents=True, exist_ok=True) - ### standby slot - # NOTE: might not yet be populated before OTA update applied! - self.standby_ota_status_dir = self.standby_slot_mount_point / Path( - cfg.OTA_STATUS_DIR - ).relative_to("/") - - # init ota-status - self._init_boot_control() - except NotImplementedError as e: - raise ota_errors.BootControlPlatformUnsupported(module=__name__) from e - except Exception as e: - raise ota_errors.BootControlStartupFailed( - f"unspecific boot controller startup failure: {e!r}", module=__name__ - ) from e - - ###### private methods ###### - - def _init_boot_control(self): - """Init boot control and ota-status on start-up.""" - # load ota_status str and slot_in_use - _ota_status = self._load_current_ota_status() - _slot_in_use = self._load_current_slot_in_use() - current_slot = self._cboot_control.get_current_slot() - if not (_ota_status and _slot_in_use): - logger.info("initializing boot control files...") - _ota_status = wrapper.StatusOta.INITIALIZED - self._store_current_slot_in_use(current_slot) - self._store_current_ota_status(wrapper.StatusOta.INITIALIZED) - - if _ota_status in [wrapper.StatusOta.UPDATING, wrapper.StatusOta.ROLLBACKING]: - if self._is_switching_boot(): - logger.info("finalizing switching boot...") - # set the current slot(switched slot) as boot successful - self._cboot_control.mark_current_slot_boot_successful() - # switch ota_status - _ota_status = wrapper.StatusOta.SUCCESS - else: - if _ota_status == wrapper.StatusOta.ROLLBACKING: - _ota_status = wrapper.StatusOta.ROLLBACK_FAILURE - else: - _ota_status = wrapper.StatusOta.FAILURE - # status except UPDATING/ROLLBACKING remained as it - - # detect failed reboot, but only print error logging - if ( - _ota_status != wrapper.StatusOta.INITIALIZED - and _slot_in_use != current_slot - ): - logger.error( - f"boot into old slot {current_slot}, " - f"but slot_in_use indicates it should boot into {_slot_in_use}, " - "this might indicate a failed finalization at first reboot after update/rollback" - ) - - self.ota_status = _ota_status - self._store_current_ota_status(_ota_status) - logger.info(f"boot control init finished, ota_status is {_ota_status}") - - def _is_switching_boot(self) -> bool: - # evidence 1: nvbootctrl status - # the newly updated slot should not be marked as successful on the first reboot - _nvboot_res = not self._cboot_control.is_current_slot_marked_successful() - - # evidence 2: ota_status - # the newly updated/rollbacked slot should have ota-status as updating/rollback - _ota_status = self._load_current_ota_status() in [ - wrapper.StatusOta.UPDATING, - wrapper.StatusOta.ROLLBACKING, - ] - - # evidence 3: slot in use - # the slot_in_use file should have the same slot as current slot - _is_slot_in_use = ( - self._load_current_slot_in_use() == self._cboot_control.get_current_slot() - ) - - # NOTE(20230609): only check _ota_status_ and _is_slot_in_use, remove _nvboot_res check. - # as long as we are in UPDATING(_ota_status flag), - # and we should in this slot(_is_slot_in_use), then we are OK to finalize. - _is_switching_boot = _ota_status and _is_slot_in_use - logger.info( - "[switch_boot detection]\n" - f"ota_status is UPDATING in this slot: {_ota_status=}\n" - f"slot_in_use indicates we should in this slot: {_is_slot_in_use=}\n" - f"{_is_switching_boot=}" - ) - if _is_switching_boot and not _nvboot_res: - logger.warning( - f"{_ota_status=} and {_is_slot_in_use=} " - "show that we should be in finalizing switching boot stage," - f"but this slot is not marked as unbootable." - ) - return _is_switching_boot - - def _populate_boot_folder_to_separate_bootdev(self): - # mount the actual standby_boot_dev now - _boot_dir_mount_point = Path(cfg.SEPARATE_BOOT_MOUNT_POINT) - _boot_dir_mount_point.mkdir(exist_ok=True, parents=True) - - try: - CMDHelperFuncs.mount_rw( - self._cboot_control.get_standby_boot_dev(), - _boot_dir_mount_point, - ) - except Exception as e: - _msg = f"failed to mount standby boot dev: {e!r}" - logger.error(_msg) - raise NvbootctrlError(_msg) from e - - try: - dst = _boot_dir_mount_point / "boot" - dst.mkdir(exist_ok=True, parents=True) - src = self.standby_slot_mount_point / "boot" - - # copy the standby slot's boot folder to emmc boot dev - copytree_identical(src, dst) - except Exception as e: - _msg = f"failed to populate boot folder to separate bootdev: {e!r}" - logger.error(_msg) - raise NvbootctrlError(_msg) from e - finally: - # unmount standby emmc boot dev on finish/failure - try: - CMDHelperFuncs.umount(_boot_dir_mount_point) - except Exception as e: - _failure_msg = f"failed to umount boot dev: {e!r}" - logger.warning(_failure_msg) - # no need to raise to the caller - - ###### public methods ###### - # also includes methods from OTAStatusMixin, VersionControlMixin - # load_version, get_ota_status - - def on_operation_failure(self): - """Failure registering and cleanup at failure.""" - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - # when standby slot is not created, otastatus is not needed to be set - if CMDHelperFuncs.is_target_mounted(self.standby_slot_mount_point): - self._store_standby_ota_status(wrapper.StatusOta.FAILURE) - - logger.warning("on failure try to unmounting standby slot...") - self._umount_all(ignore_error=True) - - def get_standby_slot_path(self) -> Path: - return self.standby_slot_mount_point - - def get_standby_boot_dir(self) -> Path: - """ - NOTE: in cboot controller, we directly use the /boot dir under the standby slot, - and sync to the external boot dev in the post_update if needed. - """ - return self.standby_slot_mount_point / "boot" - - def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby=False): - try: - # store current slot status - _target_slot = self._cboot_control.get_standby_slot() - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - self._store_current_slot_in_use(_target_slot) - - # setup updating - self._cboot_control.set_standby_slot_unbootable() - self._prepare_and_mount_standby( - self._cboot_control.get_standby_rootfs_dev(), - erase=erase_standby, - ) - self._mount_refroot( - standby_dev=self._cboot_control.get_standby_rootfs_dev(), - active_dev=self._cboot_control.get_current_rootfs_dev(), - standby_as_ref=standby_as_ref, - ) - - ### re-populate /boot/ota-status folder for standby slot - # create the ota-status folder unconditionally - self.standby_ota_status_dir.mkdir(exist_ok=True, parents=True) - # store status to standby slot - self._store_standby_ota_status(wrapper.StatusOta.UPDATING) - self._store_standby_version(version) - self._store_standby_slot_in_use(_target_slot) - - logger.info("pre-update setting finished") - except Exception as e: - _err_msg = f"failed on pre_update: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPreUpdateFailed( - f"{e!r}", module=__name__ - ) from e - - def post_update(self) -> Generator[None, None, None]: - try: - # firmware update - firmware = Firmware( - self.standby_slot_mount_point - / Path(cfg.FIRMWARE_CONFIG).relative_to("/") - ) - firmware.update(int(self._cboot_control.get_standby_slot())) - - # update extlinux_cfg file - _extlinux_cfg = self.standby_slot_mount_point / Path( - cfg.EXTLINUX_FILE - ).relative_to("/") - self._cboot_control.update_extlinux_cfg( - dst=_extlinux_cfg, - ref=_extlinux_cfg, - partuuid_str=self._cboot_control.get_standby_rootfs_partuuid_str(), - ) - - # NOTE: we didn't prepare /boot/ota here, - # process_persistent does this for us - if self._cboot_control.is_external_rootfs_enabled(): - logger.info( - "rootfs on external storage detected: " - "updating the /boot folder in standby bootdev..." - ) - self._populate_boot_folder_to_separate_bootdev() - - logger.info("post update finished, rebooting...") - self._umount_all(ignore_error=True) - self._cboot_control.switch_boot() - - logger.info(f"[post-update]: {Nvbootctrl.dump_slots_info()=}") - yield # hand over control back to otaclient - CMDHelperFuncs.reboot() - except Exception as e: - _err_msg = f"failed on post_update: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPostUpdateFailed( - _err_msg, module=__name__ - ) from e - - def pre_rollback(self): - try: - self._store_current_ota_status(wrapper.StatusOta.FAILURE) - self._prepare_and_mount_standby( - self._cboot_control.get_standby_rootfs_dev(), - erase=False, - ) - # store ROLLBACKING status to standby - self._store_standby_ota_status(wrapper.StatusOta.ROLLBACKING) - except Exception as e: - _err_msg = f"failed on pre_rollback: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPreRollbackFailed( - _err_msg, module=__name__ - ) from e - - def post_rollback(self): - try: - self._cboot_control.switch_boot() - CMDHelperFuncs.reboot() - except Exception as e: - _err_msg = f"failed on post_rollback: {e!r}" - logger.exception(_err_msg) - raise ota_errors.BootControlPostRollbackFailed( - _err_msg, module=__name__ - ) from e diff --git a/otaclient/app/boot_control/_common.py b/otaclient/app/boot_control/_common.py index 0ba392b35..401d316a7 100644 --- a/otaclient/app/boot_control/_common.py +++ b/otaclient/app/boot_control/_common.py @@ -14,20 +14,13 @@ r"""Shared utils for boot_controller.""" +from __future__ import annotations import logging -import os import shutil import sys from pathlib import Path from subprocess import CalledProcessError -from typing import List, Optional, Union, Callable - -from ._errors import ( - BootControlError, - MountError, - MkfsError, - MountFailedReason, -) +from typing import Literal, Optional, Union, Callable, NoReturn from ..configs import config as cfg from ..common import ( @@ -41,350 +34,323 @@ logger = logging.getLogger(__name__) +# fmt: off +PartitionToken = Literal[ + "UUID", "PARTUUID", + "LABEL", "PARTLABEL", + "TYPE", +] +# fmt: on + class CMDHelperFuncs: - """HelperFuncs bundle for wrapped linux cmd.""" + """HelperFuncs bundle for wrapped linux cmd. - @staticmethod - @MountFailedReason.parse_failed_reason - def _mount( - dev: str, - mount_point: str, - *, - options: Optional[List[str]] = None, - args: Optional[List[str]] = None, - ): - """ - mount [-o option1[,option2, ...]]] [args[0] [args[1]...]] + When underlying subprocess call failed and is True, + functions defined in this class will raise the original exception + to the upper caller. + """ - Raises: - MountError on failed mounting. - """ - _option_str = "" - if options: - _option_str = f"-o {','.join(options)}" + @classmethod + def get_attrs_by_dev( + cls, attr: PartitionToken, dev: Path | str, *, raise_exception: bool = True + ) -> str: + """Get from . - _args_str = "" - if args: - _args_str = f"{' '.join(args)}" + This is implemented by calling: + `lsblk -in -o ` - _cmd = f"mount {_option_str} {_args_str} {dev} {mount_point}" - subprocess_call(_cmd, raise_exception=True) + Args: + attr (PartitionToken): the attribute to retrieve from the . + dev (Path | str): the target device path. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @staticmethod - def _findfs(key: str, value: str) -> str: - """ - findfs finds a partition by conditions - Usage: - findfs [options] {LABEL,UUID,PARTUUID,PARTLABEL}= + Returns: + str: of . """ - _cmd = f"findfs {key}={value}" - return subprocess_check_output(_cmd) - - @staticmethod - def _findmnt(args: str) -> str: - _cmd = f"findmnt {args}" - return subprocess_check_output(_cmd) + cmd = ["lsblk", "-ino", attr, str(dev)] + return subprocess_check_output(cmd, raise_exception=raise_exception) - @staticmethod - def _lsblk(args: str, *, raise_exception=False) -> str: - """lsblk command wrapper. + @classmethod + def get_dev_by_token( + cls, token: PartitionToken, value: str, *, raise_exception: bool = True + ) -> Optional[list[str]]: + """Get a list of device(s) that matches the = pair. - Default return empty str if raise_exception==False. - """ - _cmd = f"lsblk {args}" - return subprocess_check_output( - _cmd, - raise_exception=raise_exception, - default="", - ) + This is implemented by calling: + blkid -o device -t = - ###### derived helper methods ###### + Args: + token (PartitionToken): which attribute of device to match. + value (str): the value of the attribute. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @classmethod - def get_fslabel_by_dev(cls, dev: str) -> str: - """Return the fslabel of the dev if any or empty str.""" - args = f"-in -o LABEL {dev}" - return cls._lsblk(args) + Returns: + Optional[list[str]]: If there is at least one device found, return a list + contains all found device(s), otherwise None. + """ + cmd = ["blkid", "-o", "device", "-t", f"{token}={value}"] + if res := subprocess_check_output(cmd, raise_exception=raise_exception): + return res.splitlines() @classmethod - def get_partuuid_by_dev(cls, dev: str) -> str: - """Return partuuid of input device.""" - args = f"-in -o PARTUUID {dev}" - try: - return cls._lsblk(args, raise_exception=True) - except Exception as e: - msg = f"failed to get partuuid for {dev}: {e!r}" - raise ValueError(msg) from None + def get_current_rootfs_dev(cls, *, raise_exception: bool = True) -> str: + """Get the devpath of current rootfs dev. - @classmethod - def get_uuid_by_dev(cls, dev: str) -> str: - """Return uuid of input device.""" - args = f"-in -o UUID {dev}" - try: - return cls._lsblk(args, raise_exception=True) - except Exception as e: - msg = f"failed to get uuid for {dev}: {e!r}" - raise ValueError(msg) from None + This is implemented by calling + findmnt -nfc -o SOURCE - @classmethod - def get_uuid_str_by_dev(cls, dev: str) -> str: - """Return UUID string of input device. + Args: + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - str like: "UUID=" + str: the devpath of current rootfs device. """ - return f"UUID={cls.get_uuid_by_dev(dev)}" + cmd = ["findmnt", "-nfco", "SOURCE", cfg.ACTIVE_ROOTFS_PATH] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_partuuid_str_by_dev(cls, dev: str) -> str: - """Return PARTUUID string of input device. + def get_mount_point_by_dev(cls, dev: str, *, raise_exception: bool = True) -> str: + """Get the FIRST mountpoint of the . + + This is implemented by calling: + findmnt -nfo TARGET + + NOTE: option -f is used to only show the first file system. + + Args: + dev (str): the device to check against. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - str like: "PARTUUID=" + str: the FIRST mountpint of the , or empty string if is False + and the subprocess call failed(due to dev is not mounted or other reasons). """ - return f"PARTUUID={cls.get_partuuid_by_dev(dev)}" - - @classmethod - def get_dev_by_partlabel(cls, partlabel: str) -> str: - return cls._findfs("PARTLABEL", partlabel) + cmd = ["findmnt", "-nfo", "TARGET", dev] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_dev_by_fslabel(cls, fslabel: str) -> str: - return cls._findfs("LABEL", fslabel) + def get_dev_by_mount_point( + cls, mount_point: str, *, raise_exception: bool = True + ) -> str: + """Return the source dev of the given . - @classmethod - def get_dev_by_partuuid(cls, partuuid: str) -> str: - return cls._findfs("PARTUUID", partuuid) + This is implemented by calling: + findmnt -no SOURCE - @classmethod - def get_current_rootfs_dev(cls) -> str: - """ - NOTE: - -o : only print - -n: no headings - -f: only show the first file system - -c: canonicalize printed paths + Args: + mount_point (str): mount_point to check against. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. Returns: - full path to dev of the current rootfs + str: the source device of . """ - if res := cls._findmnt("/ -o SOURCE -n -f -c"): - return res - else: - raise BootControlError("failed to detect current rootfs") + cmd = ["findmnt", "-no", "SOURCE", mount_point] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_mount_point_by_dev(cls, dev: str, *, raise_exception=True) -> str: - """ - findmnt -o TARGET -n + def is_target_mounted( + cls, target: Path | str, *, raise_exception: bool = True + ) -> bool: + """Check if is mounted or not. can be a dev or a mount point. - NOTE: findmnt raw result might have multiple lines if target dev is bind mounted. - use option -f to only show the first file system - """ - mount_points = cls._findmnt(f"{dev} -o TARGET -n -f") - if not mount_points and raise_exception: - raise MountError(f"{dev} is not mounted") + This is implemented by calling: + findmnt - return mount_points + Args: + target (Path | str): the target to check against. Could be a device or a mount point. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - @classmethod - def get_dev_by_mount_point(cls, mount_point: str) -> str: - """Return the underlying mounted dev of the given mount_point.""" - return cls._findmnt(f"-no SOURCE {mount_point}") + Returns: + bool: return True if the target has at least one mount_point. + """ + cmd = ["findmnt", target] + return bool(subprocess_check_output(cmd, raise_exception=raise_exception)) @classmethod - def is_target_mounted(cls, target: Union[Path, str]) -> bool: - return cls._findmnt(f"{target}") != "" + def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> str: + """Get the parent devpath from . - @classmethod - def get_parent_dev(cls, child_device: str) -> str: - """ When `/dev/nvme0n1p1` is specified as child_device, /dev/nvme0n1 is returned. - cmd params: - -d: print the result from specified dev only. - """ - cmd = f"-idpn -o PKNAME {child_device}" - if res := cls._lsblk(cmd): - return res - else: - raise ValueError(f"{child_device} not found or not a partition") + This function is implemented by calling: + lsblk -idpno PKNAME - @classmethod - def get_dev_family(cls, parent_device: str, *, include_parent=True) -> List[str]: - """ - When `/dev/nvme0n1` is specified as parent_device, - ["/dev/nvme0n1", "/dev/nvme0n1p1", "/dev/nvme0n1p2"...] will be return - """ - cmd = f"-Pp -o NAME {parent_device}" - res = list( - map( - lambda line: line.split("=")[-1].strip('"'), - cls._lsblk(cmd).splitlines(), - ) - ) + Args: + child_device (str): the device to find parent device from. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. - # the first line is the parent of the dev family - if include_parent: - return res - else: - return res[1:] + Returns: + str: the parent device of the specific . + """ + cmd = ["lsblk", "-idpno", "PKNAME", child_device] + return subprocess_check_output(cmd, raise_exception=raise_exception) @classmethod - def get_dev_size(cls, dev: str) -> int: - """Return the size of dev by bytes. + def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = True): + """Set to ext4 formatted . + + This is implemented by calling: + e2label - NOTE: - -d: print the result from specified dev only - -b: print size in bytes - -n: no headings + Args: + dev (str): the ext4 partition device. + fslabel (str): the fslabel to be set. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - cmd = f"-dbn -o SIZE {dev}" - try: - return int(cls._lsblk(cmd)) - except ValueError: - raise ValueError(f"failed to get size of {dev}") from None + cmd = ["e2label", dev, fslabel] + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def set_dev_fslabel(cls, dev: str, fslabel: str): - cmd = f"e2label {dev} {fslabel}" - try: - subprocess_call(cmd, raise_exception=True) - except Exception as e: - raise ValueError(f"failed to set {fslabel=} to {dev=}: {e!r}") from None + def mount_rw( + cls, target: str, mount_point: Path | str, *, raise_exception: bool = True + ): + """Mount the to read-write. - @classmethod - def mount_rw(cls, target: str, mount_point: Union[Path, str]): - """Mount the target to the mount_point read-write. + This is implemented by calling: + mount -o rw --make-private --make-unbindable NOTE: pass args = ["--make-private", "--make-unbindable"] to prevent mount events propagation to/from this mount point. - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (Path | str): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - options = ["rw"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "rw", + "--make-private", "--make-unbindable", + target, + str(mount_point), + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def bind_mount_ro(cls, target: str, mount_point: Union[Path, str]): - """Bind mount the target to the mount_point read-only. + def bind_mount_ro( + cls, target: str, mount_point: Path | str, *, raise_exception: bool = True + ): + """Bind mount the to read-only. - NOTE: pass args = ["--make-private", "--make-unbindable"] to prevent - mount events propagation to/from this mount point. + This is implemented by calling: + mount -o bind,ro --make-private --make-unbindable - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (Path | str): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - options = ["bind", "ro"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "bind,ro", + "--make-private", "--make-unbindable", + target, + str(mount_point) + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def umount(cls, target: Union[Path, str], *, ignore_error=False): - """Try to unmount the . + def umount(cls, target: Path | str, *, raise_exception: bool = True): + """Try to umount the . + + This is implemented by calling: + umount - Raises: - If ignore_error is False, raises MountError on failed unmounting. + Before calling umount, the will be check whether it is mounted, + if it is not mounted, this function will return directly. + + Args: + target (Path | str): target to be umounted. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ # first try to check whether the target(either a mount point or a dev) # is mounted - if not cls.is_target_mounted(target): + if not cls.is_target_mounted(target, raise_exception=False): return # if the target is mounted, try to unmount it. - try: - _cmd = f"umount -l {target}" - subprocess_call(_cmd, raise_exception=True) - except CalledProcessError as e: - _failure_msg = ( - f"failed to umount {target}: {e.returncode=}, {e.stderr=}, {e.stdout=}" - ) - logger.warning(_failure_msg) - - if not ignore_error: - raise MountError(_failure_msg) from None - - @classmethod - def mkfs_ext4(cls, dev: str, *, fslabel: Optional[str] = None): - """Call mkfs.ext4 on . - - Raises: - MkfsError on failed ext4 partition formatting. - """ - # NOTE: preserve the UUID and FSLABEL(if set) - _specify_uuid = "" - try: - _specify_uuid = f"-U {cls.get_uuid_by_dev(dev)}" - except Exception: - pass - - # if fslabel is specified, then use it, - # otherwise try to detect the previously set one - _specify_fslabel = "" - if fslabel: - _specify_fslabel = f"-L {fslabel}" - elif _fslabel := cls.get_fslabel_by_dev(dev): - _specify_fslabel = f"-L {_fslabel}" - - try: - logger.warning(f"format {dev} to ext4...") - _cmd = f"mkfs.ext4 {_specify_uuid} {_specify_fslabel} {dev}" - subprocess_call(_cmd, raise_exception=True) - except CalledProcessError as e: - _failure_msg = f"failed to apply mkfs.ext4 on {dev}: {e!r}" - logger.error(_failure_msg) - raise MkfsError(_failure_msg) + _cmd = ["umount", str(target)] + subprocess_call(_cmd, raise_exception=raise_exception) @classmethod - def mount_refroot( + def mkfs_ext4( cls, - standby_slot_dev: str, - active_slot_dev: str, - refroot_mount_point: str, + dev: str, *, - standby_as_ref: bool, + fslabel: Optional[str] = None, + fsuuid: Optional[str] = None, + raise_exception: bool = True, ): - """Mount reference rootfs that we copy files from to . + """Create new ext4 formatted filesystem on , optionally with + and/or . - This method bind mount refroot as ro with make-private flag and make-unbindable flag, - to prevent ANY accidental writes/changes to the refroot. - - Raises: - MountError on failed mounting. + Args: + dev (str): device to be formatted to ext4. + fslabel (Optional[str], optional): fslabel of the new ext4 filesystem. Defaults to None. + When it is None, this function will try to preserve the previous fslabel. + fsuuid (Optional[str], optional): fsuuid of the new ext4 filesystem. Defaults to None. + When it is None, this function will try to preserve the previous fsuuid. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ - _refroot_dev = standby_slot_dev if standby_as_ref else active_slot_dev + cmd = ["mkfs.ext4", "-F"] - # NOTE: set raise_exception to false to allow not mounted - # not mounted dev will have empty return str - if _refroot_active_mount_point := CMDHelperFuncs.get_mount_point_by_dev( - _refroot_dev, raise_exception=False - ): - CMDHelperFuncs.bind_mount_ro( - _refroot_active_mount_point, - refroot_mount_point, - ) - else: - # NOTE: refroot is expected to be mounted, - # if refroot is active rootfs, it is mounted as /; - # if refroot is standby rootfs(in-place update mode), - # it will be mounted to /mnt/standby(rw), so we still mount it as bind,ro - raise MountError("refroot is expected to be mounted") + if not fsuuid: + try: + fsuuid = cls.get_attrs_by_dev("UUID", dev) + assert fsuuid + logger.debug(f"reuse previous UUID: {fsuuid}") + except Exception: + pass + if fsuuid: + logger.debug(f"using UUID: {fsuuid}") + cmd.extend(["-U", fsuuid]) + + if not fslabel: + try: + fslabel = cls.get_attrs_by_dev("LABEL", dev) + assert fslabel + logger.debug(f"reuse previous fs LABEL: {fslabel}") + except Exception: + pass + if fslabel: + logger.debug(f"using fs LABEL: {fslabel}") + cmd.extend(["-L", fslabel]) + + cmd.append(dev) + logger.warning(f"format {dev} to ext4: {cmd=}") + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def mount_ro(cls, *, target: str, mount_point: Union[str, Path]): - """Mount target on mount_point read-only. + def mount_ro( + cls, *, target: str, mount_point: str | Path, raise_exception: bool = True + ): + """Mount to read-only. - If the target device is mounted, we bind mount the target device to mount_point, + If the target device is mounted, we bind mount the target device to mount_point. if the target device is not mounted, we directly mount it to the mount_point. - This method mount the target as ro with make-private flag and make-unbindable flag, - to prevent ANY accidental writes/changes to the target. - - Raises: - MountError on failed mounting. + Args: + target (str): target to be mounted. + mount_point (str | Path): mount point to mount to. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. """ # NOTE: set raise_exception to false to allow not mounted # not mounted dev will have empty return str @@ -394,30 +360,47 @@ def mount_ro(cls, *, target: str, mount_point: Union[str, Path]): CMDHelperFuncs.bind_mount_ro( _active_mount_point, mount_point, + raise_exception=raise_exception, ) else: # target is not mounted, we mount it by ourself - options = ["ro"] - args = ["--make-private", "--make-unbindable"] - cls._mount(target, str(mount_point), options=options, args=args) + # fmt: off + cmd = [ + "mount", + "-o", "ro", + "--make-private", "--make-unbindable", + target, + str(mount_point), + ] + # fmt: on + subprocess_call(cmd, raise_exception=raise_exception) @classmethod - def reboot(cls): - """Reboot the whole system otaclient running at and terminate otaclient. + def reboot(cls, args: Optional[list[str]] = None) -> NoReturn: + """Reboot the system, with optional args passed to reboot command. + + This is implemented by calling: + reboot [args[0], args[1], ...] + + NOTE(20230614): this command makes otaclient exit immediately. + NOTE(20240421): rpi_boot's reboot takes args. - NOTE(20230614): this command MUST also make otaclient exit immediately. + Args: + args (Optional[list[str]], optional): args passed to reboot command. + Defaults to None, not passing any args. """ - os.sync() + cmd = ["reboot"] + if args: + cmd.extend(args) + try: - subprocess_call("reboot", raise_exception=True) + subprocess_call(cmd, raise_exception=True) sys.exit(0) except CalledProcessError: logger.exception("failed to reboot") raise -###### helper mixins ###### - FinalizeSwitchBootFunc = Callable[[], bool] @@ -665,114 +648,6 @@ def booted_ota_status(self) -> wrapper.StatusOta: return self._ota_status -class SlotInUseMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - - def _store_current_slot_in_use(self, _slot: str): - write_str_to_file_sync( - self.current_ota_status_dir / cfg.SLOT_IN_USE_FNAME, _slot - ) - - def _store_standby_slot_in_use(self, _slot: str): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.SLOT_IN_USE_FNAME, _slot - ) - - def _load_current_slot_in_use(self) -> Optional[str]: - if res := read_str_from_file( - self.current_ota_status_dir / cfg.SLOT_IN_USE_FNAME, default="" - ): - return res - - -class OTAStatusMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - ota_status: wrapper.StatusOta - - def _store_current_ota_status(self, _status: wrapper.StatusOta): - write_str_to_file_sync( - self.current_ota_status_dir / cfg.OTA_STATUS_FNAME, _status.name - ) - - def _store_standby_ota_status(self, _status: wrapper.StatusOta): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.OTA_STATUS_FNAME, _status.name - ) - - def _load_current_ota_status(self) -> Optional[wrapper.StatusOta]: - if _status_str := read_str_from_file( - self.current_ota_status_dir / cfg.OTA_STATUS_FNAME - ).upper(): - try: - return wrapper.StatusOta[_status_str] - except KeyError: - pass # invalid status string - - def get_booted_ota_status(self) -> wrapper.StatusOta: - return self.ota_status - - -class VersionControlMixin: - current_ota_status_dir: Path - standby_ota_status_dir: Path - - def _store_standby_version(self, _version: str): - write_str_to_file_sync( - self.standby_ota_status_dir / cfg.OTA_VERSION_FNAME, - _version, - ) - - def load_version(self) -> str: - _version = read_str_from_file( - self.current_ota_status_dir / cfg.OTA_VERSION_FNAME, - missing_ok=True, - default="", - ) - if not _version: - logger.warning("version file not found, return empty version string") - - return _version - - -class PrepareMountMixin: - standby_slot_mount_point: Path - ref_slot_mount_point: Path - - def _prepare_and_mount_standby(self, standby_slot_dev: str, *, erase=False): - self.standby_slot_mount_point.mkdir(parents=True, exist_ok=True) - - # first try umount the dev - CMDHelperFuncs.umount(standby_slot_dev) - - # format the whole standby slot if needed - if erase: - logger.warning(f"perform mkfs.ext4 on standby slot({standby_slot_dev})") - CMDHelperFuncs.mkfs_ext4(standby_slot_dev) - - # try to mount the standby dev - CMDHelperFuncs.mount_rw(standby_slot_dev, self.standby_slot_mount_point) - - def _mount_refroot( - self, - *, - standby_dev: str, - active_dev: str, - standby_as_ref: bool, - ): - CMDHelperFuncs.mount_refroot( - standby_slot_dev=standby_dev, - active_slot_dev=active_dev, - refroot_mount_point=str(self.ref_slot_mount_point), - standby_as_ref=standby_as_ref, - ) - - def _umount_all(self, *, ignore_error: bool = False): - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=ignore_error) - CMDHelperFuncs.umount(self.ref_slot_mount_point, ignore_error=ignore_error) - - class SlotMountHelper: """Helper class that provides methods for mounting slots.""" @@ -800,57 +675,27 @@ def __init__( cfg.BOOT_DIR ).relative_to("/") - def mount_standby(self, *, raise_exc: bool = True) -> bool: - """Mount standby slot dev to . - - Args: - erase_standby: whether to format standby slot dev to ext4 before mounting. - raise_exc: if exception occurs, raise it or not. - - Return: - A bool indicates whether the mount succeeded or not. - """ + def mount_standby(self) -> None: + """Mount standby slot dev to .""" logger.debug("mount standby slot rootfs dev...") - try: - # first try umount mount point and dev - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=True) - if CMDHelperFuncs.is_target_mounted(self.standby_slot_dev): - CMDHelperFuncs.umount(self.standby_slot_dev) - - # try to mount the standby dev - CMDHelperFuncs.mount_rw( - target=self.standby_slot_dev, - mount_point=self.standby_slot_mount_point, - ) - return True - except Exception: - if raise_exc: - raise - return False - - def mount_active(self, *, raise_exc: bool = True) -> bool: - """Mount active rootfs ready-only. + if CMDHelperFuncs.is_target_mounted( + self.standby_slot_dev, raise_exception=False + ): + logger.debug(f"{self.standby_slot_dev=} is mounted, try to umount it ...") + CMDHelperFuncs.umount(self.standby_slot_dev, raise_exception=False) - Args: - raise_exc: if exception occurs, raise it or not. + CMDHelperFuncs.mount_rw( + target=self.standby_slot_dev, + mount_point=self.standby_slot_mount_point, + ) - Return: - A bool indicates whether the mount succeeded or not. - """ - # first try umount the mount_point + def mount_active(self) -> None: + """Mount active rootfs ready-only.""" logger.debug("mount active slot rootfs dev...") - try: - CMDHelperFuncs.umount(self.active_slot_mount_point, ignore_error=True) - # mount active slot ro, unpropagated - CMDHelperFuncs.mount_ro( - target=self.active_slot_dev, - mount_point=self.active_slot_mount_point, - ) - return True - except Exception: - if raise_exc: - raise - return False + CMDHelperFuncs.mount_ro( + target=self.active_slot_dev, + mount_point=self.active_slot_mount_point, + ) def preserve_ota_folder_to_standby(self): """Copy the /boot/ota folder to standby slot to preserve it. @@ -866,10 +711,29 @@ def preserve_ota_folder_to_standby(self): except Exception as e: raise ValueError(f"failed to copy /boot/ota from active to standby: {e!r}") + def prepare_standby_dev( + self, + *, + erase_standby: bool = False, + fslabel: Optional[str] = None, + ) -> None: + CMDHelperFuncs.umount(self.standby_slot_dev, raise_exception=False) + if erase_standby: + return CMDHelperFuncs.mkfs_ext4(self.standby_slot_dev, fslabel=fslabel) + + # TODO: in the future if in-place update mode is implemented, do a + # fschck over the standby slot file system. + if fslabel: + CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) + def umount_all(self, *, ignore_error: bool = False): logger.debug("unmount standby slot and active slot mount point...") - CMDHelperFuncs.umount(self.standby_slot_mount_point, ignore_error=ignore_error) - CMDHelperFuncs.umount(self.active_slot_mount_point, ignore_error=ignore_error) + CMDHelperFuncs.umount( + self.standby_slot_mount_point, raise_exception=ignore_error + ) + CMDHelperFuncs.umount( + self.active_slot_mount_point, raise_exception=ignore_error + ) def cat_proc_cmdline(target: str = "/proc/cmdline") -> str: diff --git a/otaclient/app/boot_control/_errors.py b/otaclient/app/boot_control/_errors.py deleted file mode 100644 index b2ba7bbd3..000000000 --- a/otaclient/app/boot_control/_errors.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -r"""Internal error types used by boot_control module.""" - -from enum import Enum, auto -from subprocess import CalledProcessError -from typing import Optional, Callable - - -class MountFailedReason(Enum): - # error code - SUCCESS = 0 - PERMISSIONS_ERROR = 1 - SYSTEM_ERROR = 2 - INTERNAL_ERROR = 4 - USER_INTERRUPT = 8 - GENERIC_MOUNT_FAILURE = 32 - - # custom error code - # specific reason for generic mount failure - TARGET_NOT_FOUND = auto() - TARGET_ALREADY_MOUNTED = auto() - MOUNT_POINT_NOT_FOUND = auto() - BIND_MOUNT_ON_NON_DIR = auto() - - @classmethod - def parse_failed_reason(cls, func: Callable): - from functools import wraps - - @wraps(func) - def _wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except CalledProcessError as e: - _called_process_error_str = ( - f"{e.cmd=} failed: {e.returncode=}, {e.stderr=}, {e.stdout=}" - ) - - _reason = cls(e.returncode) - if _reason != cls.GENERIC_MOUNT_FAILURE: - raise MountError( - _called_process_error_str, - fail_reason=_reason, - ) from None - - # if the return code is 32, determine the detailed reason - # of the mount failure - _error_msg, _fail_reason = str(e.stderr), cls.GENERIC_MOUNT_FAILURE - if _error_msg.find("already mounted") != -1: - _fail_reason = cls.TARGET_ALREADY_MOUNTED - elif _error_msg.find("mount point does not exist") != -1: - _fail_reason = cls.MOUNT_POINT_NOT_FOUND - elif _error_msg.find("does not exist") != -1: - _fail_reason = cls.TARGET_NOT_FOUND - elif _error_msg.find("Not a directory") != -1: - _fail_reason = cls.BIND_MOUNT_ON_NON_DIR - - raise MountError( - _called_process_error_str, - fail_reason=_fail_reason, - ) from None - - return _wrapper - - -class BootControlError(Exception): - """Internal used exception type related to boot control errors. - NOTE: this exception should not be directly raised to upper caller, - it should always be wrapped by a specific OTA Error type. - check app.errors for details. - """ - - -class MountError(BootControlError): - def __init__( - self, *args: object, fail_reason: Optional["MountFailedReason"] = None - ) -> None: - super().__init__(*args) - self.fail_reason = fail_reason - - -class ABPartitionError(BootControlError): - ... - - -class MkfsError(BootControlError): - ... diff --git a/otaclient/app/boot_control/_grub.py b/otaclient/app/boot_control/_grub.py index f4678f248..9cc4cdc89 100644 --- a/otaclient/app/boot_control/_grub.py +++ b/otaclient/app/boot_control/_grub.py @@ -31,6 +31,7 @@ """ +from __future__ import annotations import logging import re import shutil @@ -327,9 +328,16 @@ def _get_sibling_dev(self, active_dev: str) -> str: raise ValueError(_err_msg) # list children device file from parent device - cmd = f"-Pp -o NAME,FSTYPE {parent}" + cmd = ["lsblk", "-Ppo", "NAME,FSTYPE", parent] + try: + cmd_result = subprocess_check_output(cmd, raise_exception=True) + except Exception as e: + _err_msg = f"failed to detect boot device family tree: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + # exclude parent dev - output = CMDHelperFuncs._lsblk(cmd).splitlines()[1:] + output = cmd_result.splitlines()[1:] # FSTYPE="ext4" and # not (parent_device_file, root_device_file and boot_device_file) for blk in output: @@ -352,7 +360,14 @@ def _detect_active_slot(self) -> Tuple[str, str]: A tuple contains the slot_name and the full dev path of the active slot. """ - dev_path = CMDHelperFuncs.get_current_rootfs_dev() + try: + dev_path = CMDHelperFuncs.get_current_rootfs_dev() + assert dev_path + except Exception as e: + _err_msg = f"failed to detect current rootfs dev: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + _dev_path_ma = self.DEV_PATH_PA.match(dev_path) assert _dev_path_ma, f"dev path is invalid for OTA: {dev_path}" @@ -605,7 +620,17 @@ def _grub_update_on_booted_slot(self, *, abort_on_standby_missed=True): active_slot_grub_file = self.active_ota_partition_folder / cfg.GRUB_CFG_FNAME grub_cfg_content = GrubHelper.grub_mkconfig() - standby_uuid_str = CMDHelperFuncs.get_uuid_str_by_dev(self.standby_root_dev) + try: + standby_uuid = CMDHelperFuncs.get_attrs_by_dev( + "UUID", self.standby_root_dev + ) + assert standby_uuid + except Exception as e: + _err_msg = f"failed to get UUID of {self.standby_root_dev}: {e!r}" + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + + standby_uuid_str = f"UUID={standby_uuid}" if grub_cfg_updated := GrubHelper.update_entry_rootfs( grub_cfg_content, kernel_ver=GrubHelper.SUFFIX_OTA_STANDBY, @@ -667,27 +692,6 @@ def _ensure_standby_slot_boot_files_symlinks(self, standby_slot: str): # API - def prepare_standby_dev(self, *, erase_standby: bool): - """ - Args: - erase_standby: indicate boot_controller whether to format the - standby slot's file system or not. This value is indicated and - passed to boot controller by the standby slot creator. - """ - try: - # try to unmount the standby root dev unconditionally - if CMDHelperFuncs.is_target_mounted(self.standby_root_dev): - CMDHelperFuncs.umount(self.standby_root_dev) - - if erase_standby: - CMDHelperFuncs.mkfs_ext4(self.standby_root_dev) - # TODO: check the standby file system status - # if not erase the standby slot - except Exception as e: - _err_msg = f"failed to prepare standby dev: {e!r}" - logger.error(_err_msg) - raise _GrubBootControllerError(_err_msg) from e - def finalize_update_switch_boot(self): """Finalize switch boot and use boot files from current booted slot.""" # NOTE: since we have not yet switched boot, the active/standby relationship is @@ -769,9 +773,19 @@ def _update_fstab(self, *, active_slot_fstab: Path, standby_slot_fstab: Path): Override existed entries in standby fstab, merge new entries from active fstab. """ - standby_uuid_str = CMDHelperFuncs.get_uuid_str_by_dev( - self._boot_control.standby_root_dev - ) + try: + standby_uuid = CMDHelperFuncs.get_attrs_by_dev( + "UUID", self._boot_control.standby_root_dev + ) + assert standby_uuid + except Exception as e: + _err_msg = ( + f"failed to get UUID of {self._boot_control.standby_root_dev}: {e!r}" + ) + logger.error(_err_msg) + raise _GrubBootControllerError(_err_msg) from e + + standby_uuid_str = f"UUID={standby_uuid}" fstab_entry_pa = re.compile( r"^\s*(?P[^# ]*)\s+" r"(?P[^ ]*)\s+" @@ -869,7 +883,7 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby=False) self._ota_status_control.pre_update_current() ### mount slots ### - self._boot_control.prepare_standby_dev(erase_standby=erase_standby) + self._mp_control.prepare_standby_dev(erase_standby=erase_standby) self._mp_control.mount_standby() self._mp_control.mount_active() diff --git a/otaclient/app/boot_control/_rpi_boot.py b/otaclient/app/boot_control/_rpi_boot.py index 46c4a2bdd..69c3e2abd 100644 --- a/otaclient/app/boot_control/_rpi_boot.py +++ b/otaclient/app/boot_control/_rpi_boot.py @@ -14,6 +14,7 @@ """Boot control support for Raspberry pi 4 Model B.""" +from __future__ import annotations import logging import os import re @@ -23,7 +24,7 @@ from .. import errors as ota_errors from ..proto import wrapper -from ..common import replace_atomic, subprocess_call +from ..common import replace_atomic, subprocess_call, subprocess_check_output from ._common import ( OTAStatusFilesControl, @@ -77,9 +78,8 @@ class _RPIBootControl: def __init__(self) -> None: self.system_boot_path = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) - if not ( - self.system_boot_path.is_dir() - and CMDHelperFuncs.is_target_mounted(self.system_boot_path) + if not CMDHelperFuncs.is_target_mounted( + self.system_boot_path, raise_exception=False ): _err_msg = "system-boot is not presented or not mounted!" logger.error(_err_msg) @@ -92,21 +92,22 @@ def _init_slots_info(self): logger.debug("checking and initializing slots info...") try: # detect active slot - self._active_slot_dev = CMDHelperFuncs.get_dev_by_mount_point( - cfg.ACTIVE_ROOTFS_PATH + _active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert _active_slot_dev + self._active_slot_dev = _active_slot_dev + + _active_slot = CMDHelperFuncs.get_attrs_by_dev( + "LABEL", str(self._active_slot_dev) ) - if not ( - _active_slot := CMDHelperFuncs.get_fslabel_by_dev(self._active_slot_dev) - ): - raise ValueError( - f"failed to get slot_id(fslabel) for active slot dev({self._active_slot_dev})" - ) + assert _active_slot self._active_slot = _active_slot # detect standby slot # NOTE: using the similar logic like grub, detect the silibing dev # of the active slot as standby slot - _parent = CMDHelperFuncs.get_parent_dev(self._active_slot_dev) + _parent = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) + assert _parent + # list children device file from parent device # exclude parent dev(always in the front) # expected raw result from lsblk: @@ -114,7 +115,11 @@ def _init_slots_info(self): # NAME="/dev/sdx1" # system-boot # NAME="/dev/sdx2" # slot_a # NAME="/dev/sdx3" # slot_b - _raw_child_partitions = CMDHelperFuncs._lsblk(f"-Pp -o NAME {_parent}") + _check_dev_family_cmd = ["lsblk", "-Ppo", "NAME", _parent] + _raw_child_partitions = subprocess_check_output( + _check_dev_family_cmd, raise_exception=True + ) + try: # NOTE: exclude the first 2 lines(parent and system-boot) _child_partitions = list( @@ -315,38 +320,12 @@ def finalize_switching_boot(self) -> bool: # reboot to the same slot to apply the new firmware logger.info("reboot to apply new firmware...") - CMDHelperFuncs.reboot() - - # NOTE(20230614): after calling reboot, otaclient SHOULD be terminated or exception raised - # on failed reboot command subprocess call. But if somehow it doesn't, - # it should be treated as failure and return False here. - return False - + CMDHelperFuncs.reboot() # this function will make otaclient exit immediately except Exception as e: _err_msg = f"failed to finalize boot switching: {e!r}" logger.error(_err_msg) return False - def prepare_standby_dev(self, *, erase_standby: bool): - # try umount and dev - if CMDHelperFuncs.is_target_mounted(self.standby_slot_dev): - CMDHelperFuncs.umount(self.standby_slot_dev) - try: - if erase_standby: - CMDHelperFuncs.mkfs_ext4( - self.standby_slot_dev, - fslabel=self.standby_slot, - ) - else: - # TODO: check the standby file system status - # if not erase the standby slot - # set the standby file system label with standby slot id - CMDHelperFuncs.set_dev_fslabel(self.active_slot_dev, self.standby_slot) - except Exception as e: - _err_msg = f"failed to prepare standby dev: {e!r}" - logger.error(_err_msg) - raise _RPIBootControllerError(_err_msg) from e - def prepare_tryboot_txt(self): """Copy the standby slot's config.txt as tryboot.txt.""" logger.debug("prepare tryboot.txt...") @@ -364,8 +343,7 @@ def reboot_tryboot(self): """Reboot with tryboot flag.""" logger.info(f"tryboot reboot to standby slot({self.standby_slot})...") try: - _cmd = "reboot '0 tryboot'" - subprocess_call(_cmd, raise_exception=True) + CMDHelperFuncs.reboot(args=["0 tryboot"]) except Exception as e: _err_msg = "failed to reboot" logger.exception(_err_msg) @@ -496,7 +474,10 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool) self._ota_status_control.pre_update_current() ### mount slots ### - self._rpiboot_control.prepare_standby_dev(erase_standby=erase_standby) + self._mp_control.prepare_standby_dev( + erase_standby=erase_standby, + fslabel=self._rpiboot_control.standby_slot, + ) self._mp_control.mount_standby() self._mp_control.mount_active() diff --git a/otaclient/app/boot_control/selecter.py b/otaclient/app/boot_control/selecter.py index 369e2835f..93ee5190a 100644 --- a/otaclient/app/boot_control/selecter.py +++ b/otaclient/app/boot_control/selecter.py @@ -20,7 +20,6 @@ from typing_extensions import deprecated from .configs import BootloaderType -from ._errors import BootControlError from .protocol import BootControllerProtocol from ..common import read_str_from_file @@ -82,13 +81,9 @@ def get_boot_controller( from ._grub import GrubController return GrubController - if bootloader_type == BootloaderType.CBOOT: - from ._cboot import CBootController - - return CBootController if bootloader_type == BootloaderType.RPI_BOOT: from ._rpi_boot import RPIBootController return RPIBootController - raise BootControlError from NotImplementedError(f"unsupported: {bootloader_type=}") + raise NotImplementedError(f"unsupported: {bootloader_type=}") diff --git a/otaclient/app/common.py b/otaclient/app/common.py index 4355f48c2..536cbcb7f 100644 --- a/otaclient/app/common.py +++ b/otaclient/app/common.py @@ -118,45 +118,99 @@ def write_str_to_file_sync(path: Union[Path, str], input: str): os.fsync(f.fileno()) -# wrapped subprocess call -def subprocess_call(cmd: str, *, raise_exception=False): +def subprocess_run_wrapper( + cmd: str | list[str], + *, + check: bool, + check_output: bool, + timeout: Optional[float] = None, +) -> subprocess.CompletedProcess[bytes]: + """A wrapper for subprocess.run method. + + NOTE: this is for the requirement of customized subprocess call + in the future, like chroot or nsenter before execution. + + Args: + cmd (str | list[str]): command to be executed. + check (bool): if True, raise CalledProcessError on non 0 return code. + check_output (bool): if True, the UTF-8 decoded stdout will be returned. + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + subprocess.CompletedProcess[bytes]: the result of the execution. """ + if isinstance(cmd, str): + cmd = shlex.split(cmd) + + return subprocess.run( + cmd, + check=check, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE if check_output else None, + timeout=timeout, + ) - Raises: - a ValueError containing information about the failure. + +def subprocess_check_output( + cmd: str | list[str], + *, + raise_exception: bool = False, + default: str = "", + timeout: Optional[float] = None, +) -> str: + """Run the and return UTF-8 decoded stripped stdout. + + Args: + cmd (str | list[str]): command to be executed. + raise_exception (bool, optional): raise the underlying CalledProcessError. Defaults to False. + default (str, optional): if is False, return on underlying + subprocess call failed. Defaults to "". + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + str: UTF-8 decoded stripped stdout. """ try: - # NOTE: we need to check the stderr and stdout when error occurs, - # so use subprocess.run here instead of subprocess.check_call - subprocess.run( - shlex.split(cmd), - check=True, - capture_output=True, + res = subprocess_run_wrapper( + cmd, check=True, check_output=True, timeout=timeout ) + return res.stdout.decode().strip() except subprocess.CalledProcessError as e: - msg = f"command({cmd=}) failed({e.returncode=}, {e.stderr=}, {e.stdout=})" - logger.debug(msg) + _err_msg = ( + f"command({cmd=}) failed(retcode={e.returncode}: \n" + f"stderr={e.stderr.decode()}" + ) + logger.debug(_err_msg) + if raise_exception: raise + return default -def subprocess_check_output(cmd: str, *, raise_exception=False, default="") -> str: - """ - Raises: - a ValueError containing information about the failure. +def subprocess_call( + cmd: str | list[str], + *, + raise_exception: bool = False, + timeout: Optional[float] = None, +) -> None: + """Run the . + + Args: + cmd (str | list[str]): command to be executed. + raise_exception (bool, optional): raise the underlying CalledProcessError. Defaults to False. + timeout (Optional[float], optional): timeout for execution. Defaults to None. """ try: - return ( - subprocess.check_output(shlex.split(cmd), stderr=subprocess.PIPE) - .decode() - .strip() - ) + subprocess_run_wrapper(cmd, check=True, check_output=False, timeout=timeout) except subprocess.CalledProcessError as e: - msg = f"command({cmd=}) failed({e.returncode=}, {e.stderr=}, {e.stdout=})" - logger.debug(msg) + _err_msg = ( + f"command({cmd=}) failed(retcode={e.returncode}: \n" + f"stderr={e.stderr.decode()}" + ) + logger.debug(_err_msg) + if raise_exception: raise - return default def copy_stat(src: Union[Path, str], dst: Union[Path, str]): @@ -169,6 +223,8 @@ def copy_stat(src: Union[Path, str], dst: Union[Path, str]): def copytree_identical(src: Path, dst: Path): """Recursively copy from the src folder to dst folder. + Source folder MUST be a dir. + This function populate files/dirs from the src to the dst, and make sure the dst is identical to the src. @@ -189,8 +245,13 @@ def copytree_identical(src: Path, dst: Path): NOTE: is_file/is_dir also returns True if it is a symlink and the link target is_file/is_dir """ + if src.is_symlink() or not src.is_dir(): + raise ValueError(f"{src} is not a dir") + if dst.is_symlink() or not dst.is_dir(): - raise FileNotFoundError(f"{dst} is not found or not a dir") + logger.info(f"{dst=} doesn't exist or not a dir, cleanup and mkdir") + dst.unlink(missing_ok=True) # unlink doesn't follow the symlink + dst.mkdir(mode=src.stat().st_mode, parents=True) # phase1: populate files to the dst for cur_dir, dirs, files in os.walk(src, topdown=True, followlinks=False): @@ -502,7 +563,7 @@ def shutdown(self, *, raise_last_exc: bool): logger.debug("shutdown retry task map") if self._running_inst: self._running_inst.shutdown(raise_last_exc=raise_last_exc) - # NOTE: passthrough the exception from underlaying running_inst + # NOTE: passthrough the exception from underlying running_inst finally: self._running_inst = None self._executor.shutdown(wait=True) diff --git a/otaclient/app/ota_client_stub.py b/otaclient/app/ota_client_stub.py index 63298374f..718e44df3 100644 --- a/otaclient/app/ota_client_stub.py +++ b/otaclient/app/ota_client_stub.py @@ -98,20 +98,27 @@ def _subprocess_init(self): def _mount_external_cache_storage(self): # detect cache_dev on every startup - _cache_dev = CMDHelperFuncs._findfs("LABEL", self._external_cache_dev_fslabel) + _cache_dev = CMDHelperFuncs.get_dev_by_token( + "LABEL", + self._external_cache_dev_fslabel, + raise_exception=False, + ) if not _cache_dev: return + if len(_cache_dev) > 1: + logger.warning( + f"multiple external cache storage device found, use the first one: {_cache_dev[0]}" + ) + _cache_dev = _cache_dev[0] + self.logger.info(f"external cache dev detected at {_cache_dev}") self._external_cache_dev = _cache_dev # try to unmount the mount_point and cache_dev unconditionally _mp = Path(self._external_cache_dev_mp) - CMDHelperFuncs.umount(_cache_dev, ignore_error=True) - if _mp.is_dir(): - CMDHelperFuncs.umount(self._external_cache_dev_mp, ignore_error=True) - else: - _mp.mkdir(parents=True, exist_ok=True) + CMDHelperFuncs.umount(_cache_dev, raise_exception=False) + _mp.mkdir(parents=True, exist_ok=True) # try to mount cache_dev ro try: diff --git a/otaclient/app/proto/README.md b/otaclient/app/proto/README.md index b1a177efa..4aab4ca71 100644 --- a/otaclient/app/proto/README.md +++ b/otaclient/app/proto/README.md @@ -143,7 +143,7 @@ After the compiled python code is ready, we can define the wrappers accordingly # type hints for each attributes when manually create instances # of wrapper types. # NOTE: this __init__ is just for typing use, it will not be - # used by the underlaying MessageWrapper + # used by the underlying MessageWrapper # NOTE: if pyi file is also generated when compiling the proto file, # the __init__ in pyi file can be used directly with little adjustment def __init__( diff --git a/tests/test_boot_control/test_cboot.py b/tests/test_boot_control/test_cboot.py deleted file mode 100644 index f0d7d8bc3..000000000 --- a/tests/test_boot_control/test_cboot.py +++ /dev/null @@ -1,308 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -r""" -CBOOT switch boot mechanism follow(normal successful case): -* Assume we are at slot-0, and apply an OTA update - -condition before OTA update: - current slot: 0, ota_status=SUCCESS, slot_in_use=0 - standby slot: 1 - -pre-update: - 1. store current ota_status(=FAILURE) - 2. store current slot_in_use(=1) - 3. set standby_slot unbootable - 4. prepare and mount standby(params: standby_rootfs_dev, erase_standby) - 5. mount refroot(params: standby_rootfs_dev, current_rootfs_dev, standby_as_ref) - 6. store standby ota_status(=UPDATING) - 7. store standby slot_in_use(=1), standby_version -post-update: - 1. update extlinux_cfg file - 2. (if external_rootfs_enabled) populate boot folder to bootdev - 3. umount all - 4. switch boot -first-reboot -init boot controller - 1. makr current slot boot successful - -condition after OTA update: - current slot: 1, ota_status=SUCCESS, slot_in_use=1 - standby slot: 0 -""" -from functools import partial -from pathlib import Path -import shutil -import typing -import pytest -import pytest_mock - -import logging - -from tests.utils import SlotMeta, compare_dir -from tests.conftest import TestConfiguration as cfg - -logger = logging.getLogger(__name__) - - -class CbootFSM: - def __init__(self) -> None: - self.current_slot = cfg.SLOT_A_ID_CBOOT - self.standby_slot = cfg.SLOT_B_ID_CBOOT - self.current_slot_bootable = True - self.standby_slot_bootable = True - - self.is_boot_switched = False - - def get_current_slot(self): - return self.current_slot - - def get_standby_slot(self): - return self.standby_slot - - def get_standby_partuuid_str(self): - if self.standby_slot == cfg.SLOT_B_ID_CBOOT: - return f"PARTUUID={cfg.SLOT_B_PARTUUID}" - else: - return f"PARTUUID={cfg.SLOT_A_PARTUUID}" - - def is_current_slot_bootable(self): - return self.current_slot_bootable - - def is_standby_slot_bootable(self): - return self.standby_slot_bootable - - def mark_current_slot_as(self, bootable: bool): - self.current_slot_bootable = bootable - - def mark_standby_slot_as(self, bootable: bool): - self.standby_slot_bootable = bootable - - def switch_boot(self): - self.current_slot, self.standby_slot = self.standby_slot, self.current_slot - self.current_slot_bootable, self.standby_slot_bootable = ( - self.standby_slot_bootable, - self.current_slot_bootable, - ) - self.is_boot_switched = True - - -class TestCBootControl: - EXTLNUX_CFG_SLOT_A = Path(__file__).parent / "extlinux.conf_slot_a" - EXTLNUX_CFG_SLOT_B = Path(__file__).parent / "extlinux.conf_slot_b" - - def cfg_for_slot_a_as_current(self): - """ - NOTE: we always only refer to ota-status dir at the rootfs! - """ - from otaclient.app.boot_control.configs import CBootControlConfig - - _mocked_cboot_cfg = CBootControlConfig() - _mocked_cboot_cfg.MOUNT_POINT = str(self.slot_b) # type: ignore - _mocked_cboot_cfg.ACTIVE_ROOT_MOUNT_POINT = str(self.slot_a) # type: ignore - # NOTE: SEPARATE_BOOT_MOUNT_POINT is the root of the boot device! - _mocked_cboot_cfg.SEPARATE_BOOT_MOUNT_POINT = str(self.slot_b_boot_dev) - _mocked_cboot_cfg.ACTIVE_ROOTFS_PATH = str(self.slot_a) # type: ignore - return _mocked_cboot_cfg - - def cfg_for_slot_b_as_current(self): - from otaclient.app.boot_control.configs import CBootControlConfig - - _mocked_cboot_cfg = CBootControlConfig() - _mocked_cboot_cfg.MOUNT_POINT = str(self.slot_a) # type: ignore - _mocked_cboot_cfg.ACTIVE_ROOT_MOUNT_POINT = str(self.slot_b) # type: ignore - # NOTE: SEPARATE_BOOT_MOUNT_POINT is the root of the boot device! - _mocked_cboot_cfg.SEPARATE_BOOT_MOUNT_POINT = str(self.slot_a_boot_dev) - _mocked_cboot_cfg.ACTIVE_ROOTFS_PATH = str(self.slot_b) # type: ignore - return _mocked_cboot_cfg - - @pytest.fixture - def cboot_ab_slot(self, ab_slots: SlotMeta): - """ - TODO: not considering rootfs on internal storage now - boot folder structure for cboot: - boot_dir_{slot_a, slot_b}/ - ota-status/ - status - version - slot_in_use - """ - self.slot_a = Path(ab_slots.slot_a) - self.slot_b = Path(ab_slots.slot_b) - self.slot_a_boot_dev = Path(ab_slots.slot_a_boot_dev) - self.slot_b_boot_dev = Path(ab_slots.slot_b_boot_dev) - self.slot_a_uuid = cfg.SLOT_A_PARTUUID - self.slot_b_uuid = cfg.SLOT_B_PARTUUID - - # prepare ota_status dir for slot_a - self.slot_a_ota_status_dir = self.slot_a / Path(cfg.OTA_STATUS_DIR).relative_to( - "/" - ) - self.slot_a_ota_status_dir.mkdir(parents=True) - slot_a_ota_status = self.slot_a_ota_status_dir / "status" - slot_a_ota_status.write_text("SUCCESS") - slot_a_version = self.slot_a_ota_status_dir / "version" - slot_a_version.write_text(cfg.CURRENT_VERSION) - slot_a_slot_in_use = self.slot_a_ota_status_dir / "slot_in_use" - slot_a_slot_in_use.write_text(cfg.SLOT_A_ID_CBOOT) - # also prepare a copy of boot folder to rootfs - shutil.copytree( - self.slot_a_boot_dev / Path(cfg.BOOT_DIR).relative_to("/"), - self.slot_a / Path(cfg.BOOT_DIR).relative_to("/"), - dirs_exist_ok=True, - ) - - # prepare extlinux file - extlinux_dir = self.slot_a / "boot/extlinux" - extlinux_dir.mkdir() - extlinux_cfg = extlinux_dir / "extlinux.conf" - extlinux_cfg.write_text(self.EXTLNUX_CFG_SLOT_A.read_text()) - - # ota_status dir for slot_b(separate boot dev) - self.slot_b_ota_status_dir = self.slot_b / Path(cfg.OTA_STATUS_DIR).relative_to( - "/" - ) - - @pytest.fixture - def mock_setup( - self, - mocker: pytest_mock.MockerFixture, - cboot_ab_slot, - ): - from otaclient.app.boot_control._cboot import _CBootControl - from otaclient.app.boot_control._common import CMDHelperFuncs - - ###### start fsm ###### - self._fsm = CbootFSM() - - ###### mocking _CBootControl ###### - _CBootControl_mock = typing.cast( - _CBootControl, mocker.MagicMock(spec=_CBootControl) - ) - # mock methods - _CBootControl_mock.get_current_slot = mocker.MagicMock( - wraps=self._fsm.get_current_slot - ) - _CBootControl_mock.get_standby_slot = mocker.MagicMock( - wraps=self._fsm.get_standby_slot - ) - _CBootControl_mock.get_standby_rootfs_partuuid_str = mocker.MagicMock( - wraps=self._fsm.get_standby_partuuid_str - ) - _CBootControl_mock.mark_current_slot_boot_successful.side_effect = partial( - self._fsm.mark_current_slot_as, True - ) - _CBootControl_mock.set_standby_slot_unbootable.side_effect = partial( - self._fsm.mark_standby_slot_as, False - ) - _CBootControl_mock.switch_boot.side_effect = self._fsm.switch_boot - _CBootControl_mock.is_current_slot_marked_successful = mocker.MagicMock( - wraps=self._fsm.is_current_slot_bootable - ) - # NOTE: we only test external rootfs - _CBootControl_mock.is_external_rootfs_enabled.return_value = True - # make update_extlinux_cfg as it - _CBootControl_mock.update_extlinux_cfg = mocker.MagicMock( - wraps=_CBootControl.update_extlinux_cfg - ) - - ###### mocking _CMDHelper ###### - _CMDHelper_mock = typing.cast( - CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) - ) - - ###### patching ###### - # patch _CBootControl - _CBootControl_path = f"{cfg.CBOOT_MODULE_PATH}._CBootControl" - mocker.patch(_CBootControl_path, return_value=_CBootControl_mock) - # patch CMDHelperFuncs - # NOTE: also remember to patch CMDHelperFuncs in common - mocker.patch(f"{cfg.CBOOT_MODULE_PATH}.CMDHelperFuncs", _CMDHelper_mock) - mocker.patch( - f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs", _CMDHelper_mock - ) - mocker.patch(f"{cfg.CBOOT_MODULE_PATH}.Nvbootctrl") - - ###### binding mocked object to test instance ###### - self._CBootControl_mock = _CBootControl_mock - self._CMDHelper_mock = _CMDHelper_mock - - def test_cboot_normal_update(self, mocker: pytest_mock.MockerFixture, mock_setup): - from otaclient.app.boot_control._cboot import CBootController - - _cfg_patch_path = f"{cfg.CBOOT_MODULE_PATH}.cfg" - _relative_ota_status_path = Path(cfg.OTA_STATUS_DIR).relative_to("/") - - ###### stage 1 ###### - mocker.patch(_cfg_patch_path, self.cfg_for_slot_a_as_current()) - logger.info("init cboot controller...") - cboot_controller = CBootController() - assert ( - self.slot_a / _relative_ota_status_path / "status" - ).read_text() == "SUCCESS" - - # test pre-update - cboot_controller.pre_update( - version=cfg.UPDATE_VERSION, - standby_as_ref=False, # NOTE: not used - erase_standby=False, # NOTE: not used - ) - # assert current slot ota-status - assert ( - self.slot_a / _relative_ota_status_path / "status" - ).read_text() == "FAILURE" - assert ( - self.slot_a / "boot/ota-status/slot_in_use" - ).read_text() == self._fsm.get_standby_slot() - # assert standby slot ota-status - assert ( - self.slot_b / _relative_ota_status_path / "status" - ).read_text() == "UPDATING" - assert ( - self.slot_b / _relative_ota_status_path / "version" - ).read_text() == cfg.UPDATE_VERSION - assert ( - self.slot_b / _relative_ota_status_path / "slot_in_use" - ).read_text() == self._fsm.get_standby_slot() - - logger.info("pre-update completed, entering post-update...") - # NOTE: standby slot's extlinux file is not yet populated(done by create_standby) - # prepare it by ourself - # NOTE 2: populate to standby rootfs' boot folder - standby_extlinux_dir = self.slot_b / "boot/extlinux" - standby_extlinux_dir.mkdir() - standby_extlinux_file = standby_extlinux_dir / "extlinux.conf" - standby_extlinux_file.write_text(self.EXTLNUX_CFG_SLOT_A.read_text()) - - # test post-update - _post_updater = cboot_controller.post_update() - next(_post_updater) - next(_post_updater, None) - assert ( - self.slot_b_boot_dev / "boot/extlinux/extlinux.conf" - ).read_text() == self.EXTLNUX_CFG_SLOT_B.read_text() - self._CBootControl_mock.switch_boot.assert_called_once() - self._CMDHelper_mock.reboot.assert_called_once() - # assert separate bootdev is populated correctly - compare_dir(self.slot_b / "boot", self.slot_b_boot_dev / "boot") - - ###### stage 2 ###### - logger.info("post-update completed, test init after first reboot...") - mocker.patch(_cfg_patch_path, self.cfg_for_slot_b_as_current()) - cboot_controller = CBootController() - assert ( - self.slot_b / _relative_ota_status_path / "status" - ).read_text() == "SUCCESS" - assert self._fsm.is_boot_switched diff --git a/tests/test_boot_control/test_grub.py b/tests/test_boot_control/test_grub.py index 180a5d6a1..0c70ce1ea 100644 --- a/tests/test_boot_control/test_grub.py +++ b/tests/test_boot_control/test_grub.py @@ -35,8 +35,8 @@ def __init__(self, slot_a_mp, slot_b_mp) -> None: self._standby_slot = cfg.SLOT_B_ID_GRUB self._current_slot_mp = Path(slot_a_mp) self._standby_slot_mp = Path(slot_b_mp) - self._current_slot_dev_uuid = f"UUID={cfg.SLOT_A_UUID}" - self._standby_slot_dev_uuid = f"UUID={cfg.SLOT_B_UUID}" + self._current_slot_dev_uuid = cfg.SLOT_A_UUID + self._standby_slot_dev_uuid = cfg.SLOT_B_UUID self.current_slot_bootable = True self.standby_slot_bootable = True @@ -63,7 +63,7 @@ def get_standby_slot_mp(self) -> Path: def get_standby_boot_dir(self) -> Path: return self._standby_slot_mp / "boot" - def get_uuid_str_by_dev(self, dev: str): + def get_attrs_by_dev(self, attr: str, dev: str) -> str: if dev == self.get_standby_slot_dev(): return self._standby_slot_dev_uuid else: @@ -271,8 +271,8 @@ def mock_setup( CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) ) _CMDHelper_mock.reboot.side_effect = self._fsm.switch_boot - _CMDHelper_mock.get_uuid_str_by_dev = mocker.MagicMock( - wraps=self._fsm.get_uuid_str_by_dev + _CMDHelper_mock.get_attrs_by_dev = mocker.MagicMock( + wraps=self._fsm.get_attrs_by_dev ) # bind the mocker to the test instance self._CMDHelper_mock = _CMDHelper_mock diff --git a/tests/test_common.py b/tests/test_common.py index 534e311da..850a865a8 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -19,7 +19,6 @@ import pytest import random import logging -from concurrent.futures import ThreadPoolExecutor from functools import partial from hashlib import sha256 from multiprocessing import Process @@ -38,11 +37,10 @@ subprocess_call, subprocess_check_output, verify_file, - wait_with_backoff, write_str_to_file_sync, ) from tests.utils import compare_dir -from tests.conftest import cfg, run_http_server +from tests.conftest import run_http_server logger = logging.getLogger(__name__) @@ -97,39 +95,6 @@ def test_write_to_file_sync(tmp_path: Path): assert _path.read_text() == _TEST_FILE_CONTENT -def test_subprocess_call(): - with pytest.raises(subprocess.CalledProcessError) as e: - subprocess_call("ls /non-existed", raise_exception=True) - _origin_e = e.value - assert _origin_e.returncode == 2 - assert ( - _origin_e.stderr.decode().strip() - == "ls: cannot access '/non-existed': No such file or directory" - ) - - subprocess_call("ls /non-existed", raise_exception=False) - - -def test_subprocess_check_output(file_t: Tuple[str, str, int]): - _path, _, _ = file_t - with pytest.raises(subprocess.CalledProcessError) as e: - subprocess_check_output("cat /non-existed", raise_exception=True) - _origin_e = e.value - assert _origin_e.returncode == 1 - assert ( - _origin_e.stderr.decode().strip() - == "cat: /non-existed: No such file or directory" - ) - - assert ( - subprocess_check_output( - "cat /non-existed", raise_exception=False, default="abc" - ) - == "abc" - ) - assert subprocess_check_output(f"cat {_path}") == _TEST_FILE_CONTENT - - class Test_copytree_identical: @pytest.fixture(autouse=True) def setup(self, tmp_path: Path): @@ -266,7 +231,7 @@ def test_symlink_to_directory(self, tmp_path: Path): class _RetryTaskMapTestErr(Exception): - ... + """""" class TestRetryTaskMap: @@ -445,3 +410,58 @@ def test_probing_delayed_online_server(self, subprocess_launch_server): ) # probing should cost at least seconds assert int(time.time()) >= start_time + self.LAUNCH_DELAY + + +class TestSubprocessCall: + TEST_FILE_CONTENTS = "test file contents" + + @pytest.fixture(autouse=True) + def setup_test(self, tmp_path: Path): + test_file = tmp_path / "test_file" + test_file.write_text(self.TEST_FILE_CONTENTS) + self.existed_file = test_file + self.non_existed_file = tmp_path / "non_existed_file" + + def test_subprocess_call_failed(self): + cmd = ["ls", str(self.non_existed_file)] + with pytest.raises(subprocess.CalledProcessError) as e: + subprocess_call(cmd, raise_exception=True) + origin_exc: subprocess.CalledProcessError = e.value + + assert origin_exc.returncode == 2 + assert ( + origin_exc.stderr.decode().strip() + == f"ls: cannot access '{self.non_existed_file}': No such file or directory" + ) + + # test exception supressed + subprocess_call(cmd, raise_exception=False) + + def test_subprocess_call_succeeded(self): + cmd = ["ls", str(self.existed_file)] + subprocess_call(cmd, raise_exception=True) + + def test_subprocess_check_output_failed(self): + cmd = ["cat", str(self.non_existed_file)] + + with pytest.raises(subprocess.CalledProcessError) as e: + subprocess_check_output(cmd, raise_exception=True) + origin_exc: subprocess.CalledProcessError = e.value + assert origin_exc.returncode == 1 + assert ( + origin_exc.stderr.decode().strip() + == f"cat: {self.non_existed_file}: No such file or directory" + ) + + # test exception supressed + default_value = "test default_value" + output = subprocess_check_output( + cmd, default=default_value, raise_exception=False + ) + assert output == default_value + + def test_subprocess_check_output_succeeded(self): + cmd = ["cat", str(self.existed_file)] + + output = subprocess_check_output(cmd, raise_exception=True) + assert output == self.TEST_FILE_CONTENTS