diff --git a/otaclient/app/boot_control/_common.py b/otaclient/app/boot_control/_common.py index 189287405..bdfecb04c 100644 --- a/otaclient/app/boot_control/_common.py +++ b/otaclient/app/boot_control/_common.py @@ -726,7 +726,7 @@ def prepare_standby_dev( if fslabel: CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) - def umount_all(self, *, ignore_error: bool = False): + def umount_all(self, *, ignore_error: bool = True): logger.debug("unmount standby slot and active slot mount point...") CMDHelperFuncs.umount( self.standby_slot_mount_point, raise_exception=ignore_error diff --git a/otaclient/app/boot_control/_jetson_cboot.py b/otaclient/app/boot_control/_jetson_cboot.py index dbbe8dc42..35fbe9cfe 100644 --- a/otaclient/app/boot_control/_jetson_cboot.py +++ b/otaclient/app/boot_control/_jetson_cboot.py @@ -11,158 +11,60 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Boot control implementation for NVIDIA Jetson device boot with cboot.""" +"""Boot control implementation for NVIDIA Jetson device boots with cboot. + +Supports BSP version < R34. +""" from __future__ import annotations import logging import os -import re import subprocess -from functools import partial from pathlib import Path -from typing import Any, Generator, Literal, NamedTuple, Optional - -from pydantic import BaseModel, BeforeValidator, PlainSerializer -from typing_extensions import Annotated, Self +from typing import Generator, Optional from otaclient.app import errors as ota_errors -from otaclient.app.common import ( - copytree_identical, - subprocess_run_wrapper, - write_str_to_file_sync, -) +from otaclient.app.common import subprocess_run_wrapper from otaclient.app.proto import wrapper +from ..configs import config as cfg from ._common import CMDHelperFuncs, OTAStatusFilesControl, SlotMountHelper -from .configs import cboot_cfg as cfg +from ._jetson_common import ( + FirmwareBSPVersionControl, + NVBootctrlCommon, + NVBootctrlTarget, + SlotID, + copy_standby_slot_boot_to_internal_emmc, + parse_bsp_version, + preserve_ota_config_files_to_standby, + update_standby_slot_extlinux_cfg, +) +from .configs import cboot_cfg as boot_cfg from .protocol import BootControllerProtocol logger = logging.getLogger(__name__) -class SlotID(str): - VALID_SLOTS = ["0", "1"] - - def __new__(cls, _in: str | Self) -> Self: - if isinstance(_in, cls): - return _in - if _in in cls.VALID_SLOTS: - return str.__new__(cls, _in) - raise ValueError(f"{_in=} is not valid slot num, should be '0' or '1'.") - - -class BSPVersion(NamedTuple): - """ - example version string: R32.6.1 - """ - - major_ver: int - major_rev: int - minor_rev: int - - @classmethod - def parse(cls, _in: str | BSPVersion | Any) -> Self: - """Parse "Rxx.yy.z string into BSPVersion.""" - if isinstance(_in, cls): - return _in - if isinstance(_in, str): - major_ver, major_rev, minor_rev = _in[1:].split(".") - return cls(int(major_ver), int(major_rev), int(minor_rev)) - raise ValueError(f"expect str or BSPVersion instance, get {type(_in)}") - - @staticmethod - def dump(to_export: BSPVersion) -> str: - """Dump BSPVersion to string as "Rxx.yy.z".""" - return f"R{to_export.major_ver}.{to_export.major_rev}.{to_export.minor_rev}" - - -BSPVersionStr = Annotated[ - BSPVersion, - BeforeValidator(BSPVersion.parse), - PlainSerializer(BSPVersion.dump, return_type=str), -] - - -class FirmwareBSPVersion(BaseModel): - """ - BSP version string schema: Rxx.yy.z - """ - - slot_a: Optional[BSPVersionStr] = None - slot_b: Optional[BSPVersionStr] = None - - class JetsonCBootContrlError(Exception): """Exception types for covering jetson-cboot related errors.""" -class _NVBootctrl: +class _NVBootctrl(NVBootctrlCommon): """Helper for calling nvbootctrl commands. + For BSP version < R34. Without -t option, the target will be bootloader by default. """ - NVBOOTCTRL = "nvbootctrl" - NVBootctrlTarget = Literal["bootloader", "rootfs"] - - @classmethod - def _nvbootctrl( - cls, - _cmd: str, - _slot_id: Optional[SlotID] = None, - *, - check_output=False, - target: Optional[NVBootctrlTarget] = None, - ) -> Any: - cmd = [cls.NVBOOTCTRL] - if target: - cmd.extend(["-t", target]) - cmd.append(_cmd) - if _slot_id: - cmd.append(str(_slot_id)) - - res = subprocess_run_wrapper( - cmd, - check=True, - check_output=True, - ) - if check_output: - return res.stdout.decode().strip() - - @classmethod - def get_current_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: - """Prints currently running SLOT.""" - cmd = "get-current-slot" - res = cls._nvbootctrl(cmd, check_output=True, target=target) - assert isinstance(res, str), f"invalid output from get-current-slot: {res}" - return SlotID(res.strip()) - @classmethod def mark_boot_successful( cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None ) -> None: """Mark current slot as GOOD.""" cmd = "mark-boot-successful" - cls._nvbootctrl(cmd, slot_id, target=target) - - @classmethod - def get_standby_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: - """Prints standby SLOT. - - NOTE: this method is implemented with nvbootctrl get-current-slot. - """ - current_slot = cls.get_current_slot(target=target) - return SlotID("0") if current_slot == "1" else SlotID("1") - - @classmethod - def set_active_boot_slot( - cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None - ) -> None: - """On next boot, load and execute SLOT.""" - cmd = "set-active-boot-slot" - return cls._nvbootctrl(cmd, SlotID(slot_id), target=target) + cls._nvbootctrl(cmd, slot_id, check_output=False, target=target) @classmethod def set_slot_as_unbootable( @@ -170,13 +72,7 @@ def set_slot_as_unbootable( ) -> None: """Mark SLOT as invalid.""" cmd = "set-slot-as-unbootable" - return cls._nvbootctrl(cmd, SlotID(slot_id), target=target) - - @classmethod - def dump_slots_info(cls, *, target: Optional[NVBootctrlTarget] = None) -> str: - """Prints info for slots.""" - cmd = "dump-slots-info" - return cls._nvbootctrl(cmd, target=target, check_output=True) + return cls._nvbootctrl(cmd, SlotID(slot_id), check_output=False, target=target) @classmethod def is_unified_enabled(cls) -> bool | None: @@ -196,7 +92,7 @@ def is_unified_enabled(cls) -> bool | None: """ cmd = "is-unified-enabled" try: - cls._nvbootctrl(cmd) + cls._nvbootctrl(cmd, check_output=False) return True except subprocess.CalledProcessError as e: if e.returncode == 70: @@ -270,69 +166,6 @@ def verify_update(cls) -> subprocess.CompletedProcess[bytes]: return subprocess_run_wrapper(cmd, check=False, check_output=True) -class FirmwareBSPVersionControl: - """firmware_bsp_version ota-status file for tracking firmware version.""" - - def __init__( - self, current_firmware_bsp_vf: Path, standby_firmware_bsp_vf: Path - ) -> None: - self._current_fw_bsp_vf = current_firmware_bsp_vf - self._standby_fw_bsp_vf = standby_firmware_bsp_vf - - self._version = FirmwareBSPVersion() - try: - self._version = FirmwareBSPVersion.model_validate_json( - self._current_fw_bsp_vf.read_text() - ) - except Exception as e: - logger.warning( - f"invalid or missing firmware_bsp_verion file, removed: {e!r}" - ) - self._current_fw_bsp_vf.unlink(missing_ok=True) - - def write_current_firmware_bsp_version(self) -> None: - """Write instance firmware_bsp_version to firmware_bsp_version file.""" - write_str_to_file_sync(self._current_fw_bsp_vf, self._version.model_dump_json()) - - def write_standby_firmware_bsp_version(self) -> None: - """Write instance firmware_bsp_version to firmware_bsp_version file.""" - write_str_to_file_sync(self._standby_fw_bsp_vf, self._version.model_dump_json()) - - def get_version_by_slot(self, slot_id: SlotID) -> Optional[BSPVersion]: - if slot_id == "0": - return self._version.slot_a - return self._version.slot_b - - def set_version_by_slot(self, slot_id: SlotID, version: Optional[BSPVersion]): - if slot_id == "0": - self._version.slot_a = version - else: - self._version.slot_b = version - - -BSP_VER_PA = re.compile( - ( - r"# R(?P\d+) \(\w+\), REVISION: (?P\d+)\.(?P\d+), " - r"GCID: (?P\d+), BOARD: (?P\w+), EABI: (?P\w+)" - ) -) -"""Example: # R32 (release), REVISION: 6.1, GCID: 27863751, BOARD: t186ref, EABI: aarch64, DATE: Mon Jul 26 19:36:31 UTC 2021 """ - - -def parse_bsp_version(nv_tegra_release: str) -> BSPVersion: - """Get current BSP version from contents of /etc/nv_tegra_release. - - see https://developer.nvidia.com/embedded/jetson-linux-archive for BSP version history. - """ - ma = BSP_VER_PA.match(nv_tegra_release) - assert ma, f"invalid nv_tegra_release content: {nv_tegra_release}" - return BSPVersion( - int(ma.group("major_ver")), - int(ma.group("major_rev")), - int(ma.group("minor_rev")), - ) - - class _CBootControl: MMCBLK_DEV_PREFIX = "mmcblk" # internal emmc NVMESSD_DEV_PREFIX = "nvme" # external nvme ssd @@ -341,15 +174,17 @@ class _CBootControl: def __init__(self): # ------ sanity check, confirm we are at jetson device ------ # - if not os.path.exists(cfg.TEGRA_CHIP_ID_PATH): - _err_msg = f"not a jetson device, {cfg.TEGRA_CHIP_ID_PATH} doesn't exist" + if not os.path.exists(boot_cfg.TEGRA_CHIP_ID_PATH): + _err_msg = ( + f"not a jetson device, {boot_cfg.TEGRA_CHIP_ID_PATH} doesn't exist" + ) logger.error(_err_msg) raise JetsonCBootContrlError(_err_msg) # ------ check BSP version ------ # try: self.bsp_version = bsp_version = parse_bsp_version( - Path(cfg.NV_TEGRA_RELEASE_FPATH).read_text() + Path(boot_cfg.NV_TEGRA_RELEASE_FPATH).read_text() ) except Exception as e: _err_msg = f"failed to detect BSP version: {e!r}" @@ -428,9 +263,9 @@ def __init__(self): self._external_rootfs = False parent_devname = parent_devpath.name - if parent_devname.startswith(self.MMCBLK_DEV_PREFIX): + if parent_devname.startswith(boot_cfg.MMCBLK_DEV_PREFIX): logger.info(f"device boots from internal emmc: {parent_devpath}") - elif parent_devname.startswith(self.NVMESSD_DEV_PREFIX): + elif parent_devname.startswith(boot_cfg.NVMESSD_DEV_PREFIX): logger.info(f"device boots from external nvme ssd: {parent_devpath}") self._external_rootfs = True else: @@ -458,7 +293,7 @@ def __init__(self): ) # internal emmc partition - self.standby_internal_emmc_devpath = f"/dev/{self.INTERNAL_EMMC_DEVNAME}p{self._slot_id_partid[standby_rootfs_slot]}" + self.standby_internal_emmc_devpath = f"/dev/{boot_cfg.INTERNAL_EMMC_DEVNAME}p{self._slot_id_partid[standby_rootfs_slot]}" logger.info(f"finished cboot control init: {current_rootfs_slot=}") logger.info(f"nvbootctrl dump-slots-info: \n{_NVBootctrl.dump_slots_info()}") @@ -493,25 +328,6 @@ def switch_boot_to_standby(self) -> None: # the rootfs slot. _NVBootctrl.set_active_boot_slot(target_slot) - @staticmethod - def update_extlinux_cfg(_input: str, partuuid: str) -> str: - """Update input exlinux text with input rootfs .""" - - partuuid_str = f"PARTUUID={partuuid}" - - def _replace(ma: re.Match, repl: str): - append_l: str = ma.group(0) - if append_l.startswith("#"): - return append_l - res, n = re.compile(r"root=[\w\-=]*").subn(repl, append_l) - if not n: # this APPEND line doesn't contain root= placeholder - res = f"{append_l} {repl}" - - return res - - _repl_func = partial(_replace, repl=f"root={partuuid_str}") - return re.compile(r"\n\s*APPEND.*").sub(_repl_func, _input) - class JetsonCBootControl(BootControllerProtocol): """BootControllerProtocol implementation for jetson-cboot.""" @@ -528,17 +344,17 @@ def __init__(self) -> None: active_slot_dev=self._cboot_control.curent_rootfs_devpath, active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT, ) - current_ota_status_dir = Path(cfg.OTA_STATUS_DIR) + current_ota_status_dir = Path(boot_cfg.OTA_STATUS_DIR) standby_ota_status_dir = self._mp_control.standby_slot_mount_point / Path( - cfg.OTA_STATUS_DIR + boot_cfg.OTA_STATUS_DIR ).relative_to("/") # load firmware BSP version from current rootfs slot self._firmware_ver_control = FirmwareBSPVersionControl( current_firmware_bsp_vf=current_ota_status_dir - / cfg.FIRMWARE_BSP_VERSION_FNAME, + / boot_cfg.FIRMWARE_BSP_VERSION_FNAME, standby_firmware_bsp_vf=standby_ota_status_dir - / cfg.FIRMWARE_BSP_VERSION_FNAME, + / boot_cfg.FIRMWARE_BSP_VERSION_FNAME, ) # init ota-status files @@ -589,75 +405,6 @@ def _finalize_switching_boot(self) -> bool: ) return True - def _copy_standby_slot_boot_to_internal_emmc(self): - """Copy the standby slot's /boot to internal emmc dev. - - This method is involved when external rootfs is enabled, aligning with - the behavior of the NVIDIA flashing script. - - WARNING: DO NOT call this method if we are not booted from external rootfs! - NOTE: at the time this method is called, the /boot folder at - standby slot rootfs MUST be fully setup! - """ - # mount corresponding internal emmc device - internal_emmc_mp = Path(cfg.SEPARATE_BOOT_MOUNT_POINT) - internal_emmc_mp.mkdir(exist_ok=True, parents=True) - internal_emmc_devpath = self._cboot_control.standby_internal_emmc_devpath - - try: - if CMDHelperFuncs.is_target_mounted( - internal_emmc_devpath, raise_exception=False - ): - logger.debug("internal emmc device is mounted, try to unmount ...") - CMDHelperFuncs.umount(internal_emmc_devpath, raise_exception=False) - CMDHelperFuncs.mount_rw(internal_emmc_devpath, internal_emmc_mp) - except Exception as e: - _msg = f"failed to mount standby internal emmc dev: {e!r}" - logger.error(_msg) - raise JetsonCBootContrlError(_msg) from e - - try: - dst = internal_emmc_mp / "boot" - src = self._mp_control.standby_slot_mount_point / "boot" - # copy the standby slot's boot folder to emmc boot dev - copytree_identical(src, dst) - except Exception as e: - _msg = f"failed to populate standby slot's /boot folder to standby internal emmc dev: {e!r}" - logger.error(_msg) - raise JetsonCBootContrlError(_msg) from e - finally: - CMDHelperFuncs.umount(internal_emmc_mp, raise_exception=False) - - def _preserve_ota_config_files_to_standby(self): - """Preserve /boot/ota to standby /boot folder.""" - src = self._mp_control.active_slot_mount_point / "boot" / "ota" - if not src.is_dir(): # basically this should not happen - logger.warning(f"{src} doesn't exist, skip preserve /boot/ota folder.") - return - - dst = self._mp_control.standby_slot_mount_point / "boot" / "ota" - # TODO: (20240411) reconsidering should we preserve /boot/ota? - copytree_identical(src, dst) - - def _update_standby_slot_extlinux_cfg(self): - """update standby slot's /boot/extlinux/extlinux.conf to update root indicator.""" - src = standby_slot_extlinux = self._mp_control.standby_slot_mount_point / Path( - cfg.EXTLINUX_FILE - ).relative_to("/") - # if standby slot doesn't have extlinux.conf installed, use current booted - # extlinux.conf as template source. - if not standby_slot_extlinux.is_file(): - src = Path(cfg.EXTLINUX_FILE) - - # update the extlinux.conf with standby slot rootfs' partuuid - write_str_to_file_sync( - standby_slot_extlinux, - self._cboot_control.update_extlinux_cfg( - src.read_text(), - self._cboot_control.standby_rootfs_dev_partuuid, - ), - ) - def _nv_firmware_update(self) -> Optional[bool]: """Perform firmware update with nv_update_engine. @@ -674,7 +421,7 @@ def _nv_firmware_update(self) -> Optional[bool]: # ------ check if we need to do firmware update ------ # _new_bsp_v_fpath = self._mp_control.standby_slot_mount_point / Path( - cfg.NV_TEGRA_RELEASE_FPATH + boot_cfg.NV_TEGRA_RELEASE_FPATH ).relative_to("/") try: new_bsp_v = parse_bsp_version(_new_bsp_v_fpath.read_text()) @@ -692,11 +439,11 @@ def _nv_firmware_update(self) -> Optional[bool]: # ------ preform firmware update ------ # firmware_dpath = self._mp_control.standby_slot_mount_point / Path( - cfg.FIRMWARE_DPATH + boot_cfg.FIRMWARE_DPATH ).relative_to("/") _firmware_applied = False - for firmware_fname in cfg.FIRMWARE_LIST: + for firmware_fname in boot_cfg.FIRMWARE_LIST: if (firmware_fpath := firmware_dpath / firmware_fname).is_file(): logger.info(f"nv_firmware: apply {firmware_fpath} ...") try: @@ -774,7 +521,12 @@ def post_update(self) -> Generator[None, None, None]: try: logger.info("jetson-cboot: post-update ...") # ------ update extlinux.conf ------ # - self._update_standby_slot_extlinux_cfg() + update_standby_slot_extlinux_cfg( + active_slot_extlinux_fpath=Path(boot_cfg.EXTLINUX_FILE), + standby_slot_extlinux_fpath=self._mp_control.standby_slot_mount_point + / Path(boot_cfg.EXTLINUX_FILE).relative_to("/"), + standby_slot_partuuid=self._cboot_control.standby_rootfs_dev_partuuid, + ) # ------ firmware update ------ # _fw_update_result = self._nv_firmware_update() @@ -786,7 +538,14 @@ def post_update(self) -> Generator[None, None, None]: raise JetsonCBootContrlError("firmware update failed") # ------ preserve /boot/ota folder to standby rootfs ------ # - self._preserve_ota_config_files_to_standby() + preserve_ota_config_files_to_standby( + active_slot_ota_dirpath=self._mp_control.active_slot_mount_point + / "boot" + / "ota", + standby_slot_ota_dirpath=self._mp_control.standby_slot_mount_point + / "boot" + / "ota", + ) # ------ for external rootfs, preserve /boot folder to internal ------ # if self._cboot_control._external_rootfs: @@ -795,7 +554,14 @@ def post_update(self) -> Generator[None, None, None]: "copy standby slot rootfs' /boot folder " "to corresponding internal emmc dev ..." ) - self._copy_standby_slot_boot_to_internal_emmc() + copy_standby_slot_boot_to_internal_emmc( + internal_emmc_mp=Path(boot_cfg.SEPARATE_BOOT_MOUNT_POINT), + internal_emmc_devpath=Path( + self._cboot_control.standby_internal_emmc_devpath + ), + standby_slot_boot_dirpath=self._mp_control.standby_slot_mount_point + / "boot", + ) # ------ switch boot to standby ------ # self._cboot_control.switch_boot_to_standby() diff --git a/otaclient/app/boot_control/_jetson_common.py b/otaclient/app/boot_control/_jetson_common.py new file mode 100644 index 000000000..6a5b26bf1 --- /dev/null +++ b/otaclient/app/boot_control/_jetson_common.py @@ -0,0 +1,348 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Jetson device boot control implementation common. + +This module is shared by jetson-cboot and jetson-uefi bootloader type. +""" + + +from __future__ import annotations + +import logging +import re +import subprocess +from functools import partial +from pathlib import Path +from typing import Any, NamedTuple, Optional + +from pydantic import BaseModel, BeforeValidator, PlainSerializer +from typing_extensions import Annotated, Literal, Self + +from otaclient.app.common import write_str_to_file_sync + +from ..common import copytree_identical +from ._common import CMDHelperFuncs + +logger = logging.getLogger(__name__) + + +class SlotID(str): + """slot_id for A/B slots. + + On NVIDIA Jetson device, slot_a has slot_id=0, slot_b has slot_id=1. + For slot_a, the slot partition name suffix is "" or "_a". + For slot_b, the slot partition name suffix is "_b". + """ + + VALID_SLOTS = ["0", "1"] + + def __new__(cls, _in: str | Self) -> Self: + if isinstance(_in, cls): + return _in + if _in in cls.VALID_SLOTS: + return str.__new__(cls, _in) + raise ValueError(f"{_in=} is not valid slot num, should be '0' or '1'.") + + +class BSPVersion(NamedTuple): + """BSP version in NamedTuple representation. + + Example: R32.6.1 -> (32, 6, 1) + """ + + major_ver: int + major_rev: int + minor_rev: int + + @classmethod + def parse(cls, _in: str | BSPVersion | Any) -> Self: + """Parse "Rxx.yy.z string into BSPVersion.""" + if isinstance(_in, cls): + return _in + if isinstance(_in, str): + major_ver, major_rev, minor_rev = _in[1:].split(".") + return cls(int(major_ver), int(major_rev), int(minor_rev)) + raise ValueError(f"expect str or BSPVersion instance, get {type(_in)}") + + @staticmethod + def dump(to_export: BSPVersion) -> str: + """Dump BSPVersion to string as "Rxx.yy.z".""" + return f"R{to_export.major_ver}.{to_export.major_rev}.{to_export.minor_rev}" + + +BSPVersionStr = Annotated[ + BSPVersion, + BeforeValidator(BSPVersion.parse), + PlainSerializer(BSPVersion.dump, return_type=str), +] +"""BSPVersion in string representation, used by FirmwareBSPVersion model.""" + + +class FirmwareBSPVersion(BaseModel): + """ + BSP version string schema: Rxx.yy.z + """ + + slot_a: Optional[BSPVersionStr] = None + slot_b: Optional[BSPVersionStr] = None + + +NVBootctrlTarget = Literal["bootloader", "rootfs"] + + +class NVBootctrlCommon: + """Helper for calling nvbootctrl commands. + + Without -t option, the target will be bootloader by default. + + The NVBootctrlCommon class only contains methods that both exist on + jetson-cboot and jetson-uefi, which are: + + 1. get-current-slot + 2. set-active-boot-slot + 3. dump-slots-info + + Also, get-standby-slot is not a nvbootctrl and it is implemented using + nvbootctrl get-current-slot command. + """ + + NVBOOTCTRL = "nvbootctrl" + + @classmethod + def _nvbootctrl( + cls, + _cmd: str, + _slot_id: Optional[SlotID] = None, + *, + check_output, + target: Optional[NVBootctrlTarget] = None, + ) -> Any: + cmd = [cls.NVBOOTCTRL] + if target: + cmd.extend(["-t", target]) + cmd.append(_cmd) + if _slot_id: + cmd.append(str(_slot_id)) + + res = subprocess.run( + cmd, + check=True, + capture_output=True, + ) + if check_output: + return res.stdout.decode() + return + + @classmethod + def get_current_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: + """Prints currently running SLOT.""" + cmd = "get-current-slot" + res = cls._nvbootctrl(cmd, check_output=True, target=target) + assert isinstance(res, str), f"invalid output from get-current-slot: {res}" + return SlotID(res.strip()) + + @classmethod + def get_standby_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: + """Prints standby SLOT. + + NOTE: this method is implemented with nvbootctrl get-current-slot. + """ + current_slot = cls.get_current_slot(target=target) + return SlotID("0") if current_slot == "1" else SlotID("1") + + @classmethod + def set_active_boot_slot( + cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None + ) -> None: + """On next boot, load and execute SLOT.""" + cmd = "set-active-boot-slot" + return cls._nvbootctrl(cmd, SlotID(slot_id), check_output=False, target=target) + + @classmethod + def dump_slots_info(cls, *, target: Optional[NVBootctrlTarget] = None) -> str: + """Prints info for slots.""" + cmd = "dump-slots-info" + return cls._nvbootctrl(cmd, target=target, check_output=True) + + +class FirmwareBSPVersionControl: + """firmware_bsp_version ota-status file for tracking firmware version. + + The firmware BSP version is stored in /boot/ota-status/firmware_bsp_version json file, + tracking the firmware BSP version for each slot. + + Each slot should keep the same firmware_bsp_version file, this file is passed to standby slot + during OTA update. + """ + + def __init__( + self, current_firmware_bsp_vf: Path, standby_firmware_bsp_vf: Path + ) -> None: + self._current_fw_bsp_vf = current_firmware_bsp_vf + self._standby_fw_bsp_vf = standby_firmware_bsp_vf + + self._version = FirmwareBSPVersion() + try: + self._version = _version = FirmwareBSPVersion.model_validate_json( + self._current_fw_bsp_vf.read_text() + ) + logger.info(f"firmware_version: {_version}") + except Exception as e: + logger.warning( + f"invalid or missing firmware_bsp_verion file, removed: {e!r}" + ) + self._current_fw_bsp_vf.unlink(missing_ok=True) + + def write_current_firmware_bsp_version(self) -> None: + """Write firmware_bsp_version from memory to firmware_bsp_version file.""" + write_str_to_file_sync(self._current_fw_bsp_vf, self._version.model_dump_json()) + + def write_standby_firmware_bsp_version(self) -> None: + """Write firmware_bsp_version from memory to firmware_bsp_version file.""" + write_str_to_file_sync(self._standby_fw_bsp_vf, self._version.model_dump_json()) + + def get_version_by_slot(self, slot_id: SlotID) -> Optional[BSPVersion]: + """Get slot's firmware version from memory.""" + if slot_id == "0": + return self._version.slot_a + return self._version.slot_b + + def set_version_by_slot( + self, slot_id: SlotID, version: Optional[BSPVersion] + ) -> None: + """Set slot's firmware version into memory.""" + if slot_id == "0": + self._version.slot_a = version + else: + self._version.slot_b = version + + +BSP_VER_PA = re.compile( + ( + r"# R(?P\d+) \(\w+\), REVISION: (?P\d+)\.(?P\d+), " + r"GCID: (?P\d+), BOARD: (?P\w+), EABI: (?P\w+)" + ) +) +"""Example: # R32 (release), REVISION: 6.1, GCID: 27863751, BOARD: t186ref, EABI: aarch64, DATE: Mon Jul 26 19:36:31 UTC 2021 """ + + +def parse_bsp_version(nv_tegra_release: str) -> BSPVersion: + """Parse BSP version from contents of /etc/nv_tegra_release. + + see https://developer.nvidia.com/embedded/jetson-linux-archive for BSP version history. + """ + ma = BSP_VER_PA.match(nv_tegra_release) + assert ma, f"invalid nv_tegra_release content: {nv_tegra_release}" + return BSPVersion( + int(ma.group("major_ver")), + int(ma.group("major_rev")), + int(ma.group("minor_rev")), + ) + + +def update_extlinux_cfg(_input: str, partuuid: str) -> str: + """Update input exlinux text with input rootfs .""" + partuuid_str = f"PARTUUID={partuuid}" + + def _replace(ma: re.Match, repl: str): + append_l: str = ma.group(0) + if append_l.startswith("#"): + return append_l + res, n = re.compile(r"root=[\w\-=]*").subn(repl, append_l) + if not n: # this APPEND line doesn't contain root= placeholder + res = f"{append_l} {repl}" + + return res + + _repl_func = partial(_replace, repl=f"root={partuuid_str}") + return re.compile(r"\n\s*APPEND.*").sub(_repl_func, _input) + + +def copy_standby_slot_boot_to_internal_emmc( + *, + internal_emmc_mp: Path | str, + internal_emmc_devpath: Path | str, + standby_slot_boot_dirpath: Path | str, +) -> None: + """Copy the standby slot's /boot to internal emmc dev. + + This method is involved when external rootfs is enabled, aligning with + the behavior of the NVIDIA flashing script. + + WARNING: DO NOT call this method if we are not booted from external rootfs! + NOTE: at the time this method is called, the /boot folder at + standby slot rootfs MUST be fully setup! + """ + internal_emmc_mp = Path(internal_emmc_mp) + internal_emmc_mp.mkdir(exist_ok=True, parents=True) + + try: + CMDHelperFuncs.umount(internal_emmc_devpath, raise_exception=False) + CMDHelperFuncs.mount_rw( + target=str(internal_emmc_devpath), + mount_point=internal_emmc_mp, + ) + except Exception as e: + _msg = f"failed to mount standby internal emmc dev: {e!r}" + logger.error(_msg) + raise ValueError(_msg) from e + + try: + dst = internal_emmc_mp / "boot" + # copy the standby slot's boot folder to emmc boot dev + copytree_identical(Path(standby_slot_boot_dirpath), dst) + except Exception as e: + _msg = f"failed to populate standby slot's /boot folder to standby internal emmc dev: {e!r}" + logger.error(_msg) + raise ValueError(_msg) from e + finally: + CMDHelperFuncs.umount(internal_emmc_mp, raise_exception=False) + + +def preserve_ota_config_files_to_standby( + *, active_slot_ota_dirpath: Path, standby_slot_ota_dirpath: Path +) -> None: + """Preserve /boot/ota to standby /boot folder.""" + if not active_slot_ota_dirpath.is_dir(): # basically this should not happen + logger.warning( + f"{active_slot_ota_dirpath} doesn't exist, skip preserve /boot/ota folder." + ) + return + # TODO: (20240411) reconsidering should we preserve /boot/ota? + copytree_identical(active_slot_ota_dirpath, standby_slot_ota_dirpath) + + +def update_standby_slot_extlinux_cfg( + *, + active_slot_extlinux_fpath: Path, + standby_slot_extlinux_fpath: Path, + standby_slot_partuuid: str, +): + """update standby slot's /boot/extlinux/extlinux.conf to update root indicator.""" + src = standby_slot_extlinux_fpath + # if standby slot doesn't have extlinux.conf installed, use current booted + # extlinux.conf as template source. + if not standby_slot_extlinux_fpath.is_file(): + logger.warning( + f"{standby_slot_extlinux_fpath} doesn't exist, use active slot's extlinux file as template" + ) + src = active_slot_extlinux_fpath + + write_str_to_file_sync( + standby_slot_extlinux_fpath, + update_extlinux_cfg( + src.read_text(), + standby_slot_partuuid, + ), + ) diff --git a/otaclient/app/boot_control/configs.py b/otaclient/app/boot_control/configs.py index d44862d56..4369990bd 100644 --- a/otaclient/app/boot_control/configs.py +++ b/otaclient/app/boot_control/configs.py @@ -15,8 +15,7 @@ from __future__ import annotations -from dataclasses import dataclass, field -from typing import Dict, List +from dataclasses import dataclass from otaclient.configs.ecu_info import BootloaderType @@ -36,26 +35,30 @@ class GrubControlConfig(BaseConfig): BOOT_OTA_PARTITION_FILE: str = "ota-partition" -@dataclass -class JetsonCBootControlConfig(BaseConfig): +class JetsonBootCommon: + TEGRA_CHIP_ID_PATH = "/sys/module/tegra_fuse/parameters/tegra_chip_id" + OTA_STATUS_DIR = "/boot/ota-status" + FIRMWARE_BSP_VERSION_FNAME = "firmware_bsp_version" + EXTLINUX_FILE = "/boot/extlinux/extlinux.conf" + FIRMWARE_DPATH = "/opt/ota_package" + """Refer to standby slot rootfs.""" + + NV_TEGRA_RELEASE_FPATH = "/etc/nv_tegra_release" + SEPARATE_BOOT_MOUNT_POINT = "/mnt/standby_boot" + + MMCBLK_DEV_PREFIX = "mmcblk" # internal emmc + NVMESSD_DEV_PREFIX = "nvme" # external nvme ssd + INTERNAL_EMMC_DEVNAME = "mmcblk0" + + +class JetsonCBootControlConfig(JetsonBootCommon): """Jetson device booted with cboot. Suuports BSP version < R34. """ - BOOTLOADER: BootloaderType = BootloaderType.CBOOT - TEGRA_CHIP_ID_PATH: str = "/sys/module/tegra_fuse/parameters/tegra_chip_id" - CHIP_ID_MODEL_MAP: Dict[int, str] = field(default_factory=lambda: {0x19: "rqx_580"}) - OTA_STATUS_DIR: str = "/boot/ota-status" - FIRMWARE_BSP_VERSION_FNAME: str = "firmware_bsp_version" - EXTLINUX_FILE: str = "/boot/extlinux/extlinux.conf" - SEPARATE_BOOT_MOUNT_POINT: str = "/mnt/standby_boot" - # refer to the standby slot - FIRMWARE_DPATH: str = "/opt/ota_package" - FIRMWARE_LIST: List[str] = field( - default_factory=lambda: ["bl_only_payload", "xusb_only_payload"] - ) - NV_TEGRA_RELEASE_FPATH: str = "/etc/nv_tegra_release" + BOOTLOADER = BootloaderType.JETSON_CBOOT + FIRMWARE_LIST = ["bl_only_payload", "xusb_only_payload"] @dataclass diff --git a/tests/test_boot_control/test_jetson_cboot.py b/tests/test_boot_control/test_jetson_cboot.py index 001f7517d..8183eb57f 100644 --- a/tests/test_boot_control/test_jetson_cboot.py +++ b/tests/test_boot_control/test_jetson_cboot.py @@ -25,12 +25,13 @@ import pytest from otaclient.app.boot_control import _jetson_cboot -from otaclient.app.boot_control._jetson_cboot import ( +from otaclient.app.boot_control._jetson_cboot import _CBootControl +from otaclient.app.boot_control._jetson_common import ( BSPVersion, FirmwareBSPVersion, SlotID, - _CBootControl, parse_bsp_version, + update_extlinux_cfg, ) logger = logging.getLogger(__name__) @@ -140,4 +141,4 @@ def test_parse_bsp_version(_in: str, expected: BSPVersion): def test_update_extlinux_conf(_template_f: Path, _updated_f: Path, partuuid: str): _in = (TEST_DIR / _template_f).read_text() _expected = (TEST_DIR / _updated_f).read_text() - assert _CBootControl.update_extlinux_cfg(_in, partuuid) == _expected + assert update_extlinux_cfg(_in, partuuid) == _expected