From 855ec231c4dc263081137cbeace72883790f7cbe Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Tue, 3 Sep 2024 17:07:44 +0900 Subject: [PATCH] feat: add jetson-uefi boot control support, refine jetson-cboot boot control implementation (#300) This PR introduces the OTA support for NVIDIA Jetson device with UEFI(BSP >= R34) and UEFI Capsule firmware update(BSP >=R35.2). Together with #376, this PR also implements the firmware update protocol for both jetson-uefi and jetson-cboot boot control. Other Major changes includes: 1. refactor jetson-common module. 2. fully refactor the firmware update implementation in jetson-cboot module, with the newly refactored jetson-common module. 3. refine firmware_package to better meet the usages in jetson-uefi and jetson-cboot. Other Minor changes includes: 1. minor fix on boot_control._common _load_status_file. 2. otaclient_common: implement file_digest, use file_digest to implement file_sha256. --- src/otaclient/boot_control/_common.py | 2 +- .../boot_control/_firmware_package.py | 112 +- src/otaclient/boot_control/_jetson_cboot.py | 493 +++++--- src/otaclient/boot_control/_jetson_common.py | 288 ++++- src/otaclient/boot_control/_jetson_uefi.py | 1112 +++++++++++++++++ src/otaclient/boot_control/configs.py | 34 +- src/otaclient/boot_control/selecter.py | 6 + src/otaclient/configs/ecu_info.py | 1 + src/otaclient_common/__init__.py | 4 +- src/otaclient_common/common.py | 26 +- .../test_firmware_package.py | 38 +- .../test_boot_control/test_jetson_cboot.py | 144 --- .../test_boot_control/test_jetson_common.py | 322 +++++ .../test_boot_control/test_jetson_uefi.py | 307 +++++ tests/test_otaclient_common/test_common.py | 22 + 15 files changed, 2442 insertions(+), 469 deletions(-) create mode 100644 src/otaclient/boot_control/_jetson_uefi.py delete mode 100644 tests/test_otaclient/test_boot_control/test_jetson_cboot.py create mode 100644 tests/test_otaclient/test_boot_control/test_jetson_common.py create mode 100644 tests/test_otaclient/test_boot_control/test_jetson_uefi.py diff --git a/src/otaclient/boot_control/_common.py b/src/otaclient/boot_control/_common.py index 3921a4754..32dbe3215 100644 --- a/src/otaclient/boot_control/_common.py +++ b/src/otaclient/boot_control/_common.py @@ -503,7 +503,7 @@ def _load_status_file(self): _loaded_ota_status = None # initialize ota_status files if not presented/incompleted/invalid - if not _loaded_ota_status: + if _loaded_ota_status is None: logger.info( "ota_status files incompleted/not presented, " f"initializing and set/store status to {api_types.StatusOta.INITIALIZED.name}..." diff --git a/src/otaclient/boot_control/_firmware_package.py b/src/otaclient/boot_control/_firmware_package.py index cdc46268f..431fc290a 100644 --- a/src/otaclient/boot_control/_firmware_package.py +++ b/src/otaclient/boot_control/_firmware_package.py @@ -42,43 +42,49 @@ from __future__ import annotations +import logging import re from enum import Enum -from typing import Any, List, Literal +from pathlib import Path +from typing import Any, List, Literal, Union -from pydantic import BaseModel, BeforeValidator, GetCoreSchemaHandler -from pydantic_core import CoreSchema, core_schema +import yaml +from pydantic import BaseModel, BeforeValidator from typing_extensions import Annotated -from otaclient_common.typing import gen_strenum_validator +from otaclient_common.typing import StrOrPath, gen_strenum_validator + +logger = logging.getLogger(__name__) class PayloadType(str, Enum): UEFI_CAPSULE = "UEFI-CAPSULE" UEFI_BOOT_APP = "UEFI-BOOT-APP" + BUP = "BUP" DIGEST_PATTERN = re.compile(r"^(?P[\w\-+ ]+):(?P[a-zA-Z0-9]+)$") -def _pydantic_str_schema( - cls, source_type: Any, handler: GetCoreSchemaHandler -) -> CoreSchema: - """Pydantic schema adapter for str.""" - return core_schema.no_info_after_validator_function(cls, handler(str)) - - -class DigestValue(str): +class DigestValue(BaseModel): """Implementation of digest value schema :.""" - def __init__(self, _in: str) -> None: - _in = _in.strip() - if not (ma := DIGEST_PATTERN.match(_in)): - raise ValueError(f"invalid digest value: {_in}") - self.algorithm = ma.group("algorithm") - self.digest = ma.group("digest") + algorithm: str + digest: str - __get_pydantic_core_schema__ = classmethod(_pydantic_str_schema) + @classmethod + def parse(cls, _in: str | DigestValue | Any) -> DigestValue: + if isinstance(_in, DigestValue): + return _in + + if isinstance(_in, str): + _in = _in.strip() + if not (ma := DIGEST_PATTERN.match(_in)): + raise ValueError(f"invalid digest value: {_in}") + return DigestValue( + algorithm=ma.group("algorithm"), digest=ma.group("digest") + ) + raise TypeError(f"expect instance of str or {cls}, get {type(_in)}") class NVIDIAFirmwareCompat(BaseModel): @@ -95,13 +101,13 @@ def check_compat(self, _tnspec: str) -> bool: ) -class NVIDIAUEFIFirmwareSpec(BaseModel): - # NOTE: let the jetson-uefi module parse the bsp_version +class NVIDIAFirmwareSpec(BaseModel): + # NOTE: let the boot control module parse BSP version bsp_version: str firmware_compat: NVIDIAFirmwareCompat -class PayloadFileLocation(str): +class PayloadFileLocation(BaseModel): """Specifying the payload file location. It supports file URL or digest value. @@ -113,26 +119,33 @@ class PayloadFileLocation(str): """ location_type: Literal["blob", "file"] - location_path: str | DigestValue + location_path: Union[str, DigestValue] - def __init__(self, _in: str) -> None: - if _in.startswith("file://"): - self.location_type = "file" - self.location_path = _in.replace("file://", "", 1) - else: - self.location_type = "blob" - self.location_path = DigestValue(_in) + @classmethod + def parse(cls, _in: str | PayloadFileLocation | Any) -> PayloadFileLocation: + if isinstance(_in, PayloadFileLocation): + return _in - __get_pydantic_core_schema__ = classmethod(_pydantic_str_schema) + if isinstance(_in, str): + if _in.startswith("file://"): + location_type = "file" + location_path = _in.replace("file://", "", 1) + else: + location_type = "blob" + location_path = DigestValue.parse(_in) + return cls(location_type=location_type, location_path=location_path) + raise TypeError(f"expect instance of str of {cls}, get {type(_in)}") class FirmwarePackage(BaseModel): """Metadata of a firmware update package payload.""" payload_name: str - file_location: Annotated[PayloadFileLocation, BeforeValidator(PayloadFileLocation)] + file_location: Annotated[ + PayloadFileLocation, BeforeValidator(PayloadFileLocation.parse) + ] type: Annotated[PayloadType, BeforeValidator(gen_strenum_validator(PayloadType))] - digest: DigestValue + digest: Annotated[DigestValue, BeforeValidator(DigestValue.parse)] class HardwareType(str, Enum): @@ -156,7 +169,7 @@ class FirmwareManifest(BaseModel): ] hardware_series: str hardware_model: str - firmware_spec: NVIDIAUEFIFirmwareSpec + firmware_spec: NVIDIAFirmwareSpec firmware_packages: List[FirmwarePackage] def check_compat(self, _tnspec: str) -> bool: @@ -186,3 +199,34 @@ class FirmwareUpdateRequest(BaseModel): format_version: Literal[1] = 1 firmware_list: List[str] + + +def load_firmware_package( + *, + firmware_update_request_fpath: StrOrPath, + firmware_manifest_fpath: StrOrPath, +) -> tuple[FirmwareUpdateRequest, FirmwareManifest] | None: + """Parse firmware update package.""" + try: + firmware_update_request = FirmwareUpdateRequest.model_validate( + yaml.safe_load(Path(firmware_update_request_fpath).read_text()) + ) + except FileNotFoundError: + logger.info("firmware update request file not found, skip firmware update") + return + except Exception as e: + logger.warning(f"invalid request file: {e!r}") + return + + try: + firmware_manifest = FirmwareManifest.model_validate( + yaml.safe_load(Path(firmware_manifest_fpath).read_text()) + ) + except FileNotFoundError: + logger.warning("no firmware manifest file presented, skip firmware update!") + return + except Exception as e: + logger.warning(f"invalid manifest file: {e!r}") + return + + return firmware_update_request, firmware_manifest diff --git a/src/otaclient/boot_control/_jetson_cboot.py b/src/otaclient/boot_control/_jetson_cboot.py index 867b0d389..f61315d5f 100644 --- a/src/otaclient/boot_control/_jetson_cboot.py +++ b/src/otaclient/boot_control/_jetson_cboot.py @@ -20,24 +20,37 @@ from __future__ import annotations import logging -import os import subprocess from pathlib import Path from typing import Generator, Optional from otaclient.app import errors as ota_errors from otaclient.app.configs import config as cfg +from otaclient.boot_control._firmware_package import ( + FirmwareManifest, + FirmwareUpdateRequest, + PayloadType, + load_firmware_package, +) from otaclient_api.v2 import types as api_types -from otaclient_common.common import subprocess_run_wrapper +from otaclient_common import replace_root +from otaclient_common.common import file_digest, subprocess_run_wrapper +from otaclient_common.typing import StrOrPath from ._common import CMDHelperFuncs, OTAStatusFilesControl, SlotMountHelper from ._jetson_common import ( + SLOT_PAR_MAP, + BSPVersion, FirmwareBSPVersionControl, NVBootctrlCommon, + NVBootctrlExecError, NVBootctrlTarget, SlotID, copy_standby_slot_boot_to_internal_emmc, - parse_bsp_version, + detect_external_rootdev, + detect_rootfs_bsp_version, + get_nvbootctrl_conf_tnspec, + get_partition_devpath, preserve_ota_config_files_to_standby, update_standby_slot_extlinux_cfg, ) @@ -46,12 +59,18 @@ logger = logging.getLogger(__name__) +MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE = BSPVersion(34, 0, 0) +"""After R34, cboot is replaced by UEFI. + +Also, cboot firmware update with nvupdate engine is supported up to this version(exclude). +""" + class JetsonCBootContrlError(Exception): """Exception types for covering jetson-cboot related errors.""" -class _NVBootctrl(NVBootctrlCommon): +class NVBootctrlJetsonCBOOT(NVBootctrlCommon): """Helper for calling nvbootctrl commands. For BSP version < R34. @@ -61,21 +80,29 @@ class _NVBootctrl(NVBootctrlCommon): @classmethod def mark_boot_successful( cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None - ) -> None: + ) -> None: # pragma: no cover """Mark current slot as GOOD.""" cmd = "mark-boot-successful" - cls._nvbootctrl(cmd, slot_id, check_output=False, target=target) + try: + cls._nvbootctrl(cmd, slot_id, check_output=False, target=target) + except subprocess.CalledProcessError as e: + logger.warning(f"nvbootctrl {cmd} call failed: {e!r}") @classmethod def set_slot_as_unbootable( cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None - ) -> None: + ) -> None: # pragma: no cover """Mark SLOT as invalid.""" cmd = "set-slot-as-unbootable" - return cls._nvbootctrl(cmd, SlotID(slot_id), check_output=False, target=target) + try: + return cls._nvbootctrl( + cmd, SlotID(slot_id), check_output=False, target=target + ) + except subprocess.CalledProcessError as e: + logger.warning(f"nvbootctrl {cmd} call failed: {e!r}") @classmethod - def is_unified_enabled(cls) -> bool | None: + def is_unified_enabled(cls) -> bool | None: # pragma: no cover """Returns 0 only if unified a/b is enabled. NOTE: this command is available after BSP R32.6.1. @@ -97,11 +124,9 @@ def is_unified_enabled(cls) -> bool | None: except subprocess.CalledProcessError as e: if e.returncode == 70: return False - elif e.returncode == 69: + if e.returncode == 69: return - raise ValueError( - f"{cmd} returns unexpected result: {e.returncode=}, {e!r}" - ) from None + logger.warning(f"{cmd} returns unexpected result: {e.returncode=}, {e!r}") class NVUpdateEngine: @@ -148,11 +173,95 @@ def _nv_update_engine_unified_ab(cls, payload: Path | str): ) ) - @classmethod - def apply_firmware_update(cls, payload: Path | str, *, unified_ab: bool) -> None: - if unified_ab: - return cls._nv_update_engine_unified_ab(payload) - return cls._nv_update_engine(payload) + def __init__( + self, + standby_slot_mp: StrOrPath, + *, + tnspec: str, + firmware_update_request: FirmwareUpdateRequest, + firmware_manifest: FirmwareManifest, + unify_ab: bool, + ) -> None: + self._standby_slot_mp = standby_slot_mp + self._tnspec = tnspec + self._firmware_update_request = firmware_update_request + self._firmware_manifest = firmware_manifest + self._unify_ab = unify_ab + + def firmware_update(self) -> bool: + """Perform firmware update if needed. + + Returns: + True if firmware update is performed, False if there is no firmware update. + """ + firmware_bsp_ver = BSPVersion.parse( + self._firmware_manifest.firmware_spec.bsp_version + ) + if firmware_bsp_ver >= MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE: + logger.warning( + f"firmware package has {firmware_bsp_ver}, " + f"which newer or equal to {MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE}. " + f"firmware update to {MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE} or newer is NOT supported, " + "skip firmware update" + ) + return False + + # check firmware compatibility, this is to prevent failed firmware update beforehand. + if not self._firmware_manifest.check_compat(self._tnspec): + _err_msg = ( + "firmware package is incompatible with this device: " + f"{self._tnspec=}, {self._firmware_manifest.firmware_spec.firmware_compat}, " + "skip firmware update" + ) + logger.warning(_err_msg) + return False + + update_execute_func = ( + self._nv_update_engine_unified_ab + if self._unify_ab + else self._nv_update_engine + ) + + firmware_update_executed = False + for update_payload in self._firmware_manifest.get_firmware_packages( + self._firmware_update_request + ): + if update_payload.type != PayloadType.BUP: + continue + + # NOTE: currently we only support payload indicated by file path. + bup_flocation = update_payload.file_location + bup_fpath = bup_flocation.location_path + assert bup_flocation.location_type == "file" and isinstance(bup_fpath, str) + + payload_digest_alg, payload_digest_value = ( + update_payload.digest.algorithm, + update_payload.digest.digest, + ) + + # bup is located at the OTA image + bup_fpath = replace_root( + bup_fpath, + "/", + self._standby_slot_mp, + ) + if not Path(bup_fpath).is_file(): + logger.warning(f"{bup_fpath=} doesn't exist! skip...") + continue + + _digest = file_digest(bup_fpath, algorithm=payload_digest_alg) + if _digest != payload_digest_value: + logger.warning( + f"{payload_digest_alg} validation failed, " + f"expect {payload_digest_value}, get {_digest}, " + f"skip apply {update_payload.payload_name}" + ) + continue + + logger.warning(f"apply BUP {bup_fpath} to standby slot ...") + update_execute_func(bup_fpath) + firmware_update_executed = True + return firmware_update_executed @classmethod def verify_update(cls) -> subprocess.CompletedProcess[bytes]: @@ -169,82 +278,74 @@ def verify_update(cls) -> subprocess.CompletedProcess[bytes]: class _CBootControl: - MMCBLK_DEV_PREFIX = "mmcblk" # internal emmc - NVMESSD_DEV_PREFIX = "nvme" # external nvme ssd - INTERNAL_EMMC_DEVNAME = "mmcblk0" - _slot_id_partid = {SlotID("0"): "1", SlotID("1"): "2"} - def __init__(self): - # ------ sanity check, confirm we are at jetson device ------ # - if not os.path.exists(boot_cfg.TEGRA_CHIP_ID_PATH): - _err_msg = ( - f"not a jetson device, {boot_cfg.TEGRA_CHIP_ID_PATH} doesn't exist" - ) - logger.error(_err_msg) - raise JetsonCBootContrlError(_err_msg) - # ------ check BSP version ------ # + # NOTE(20240821): unfortunately, we don't have proper method to detect + # the firmware BSP version < R34, so we assume that the rootfs BSP version is the + # same as the firmware BSP version. try: - self.bsp_version = bsp_version = parse_bsp_version( - Path(boot_cfg.NV_TEGRA_RELEASE_FPATH).read_text() + self.rootfs_bsp_version = rootfs_bsp_version = detect_rootfs_bsp_version( + rootfs=cfg.ACTIVE_ROOTFS_PATH ) except Exception as e: _err_msg = f"failed to detect BSP version: {e!r}" logger.error(_err_msg) raise JetsonCBootContrlError(_err_msg) from None - logger.info(f"{bsp_version=}") + logger.info(f"{rootfs_bsp_version=}") # ------ sanity check, jetson-cboot is not used after BSP R34 ------ # - if not bsp_version < (34, 0, 0): + if rootfs_bsp_version >= MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE: _err_msg = ( - f"jetson-cboot only supports BSP version < R34, but get {bsp_version=}. " - "Please use jetson-uefi bootloader type for device with BSP >= R34." + f"jetson-cboot only supports BSP version < {MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE}," + f" but get {rootfs_bsp_version=}. " + f"Please use jetson-uefi control for device with BSP >= {MAXIMUM_SUPPORTED_BSP_VERSION_EXCLUDE}." ) logger.error(_err_msg) raise JetsonCBootContrlError(_err_msg) + # ------ load nvbootctrl config file ------ # + if not ( + nvbootctrl_conf_fpath := Path(boot_cfg.NVBOOTCTRL_CONF_FPATH) + ).is_file(): + _err_msg = "nv_boot_ctrl.conf is missing!" + logger.error(_err_msg) + raise JetsonCBootContrlError(_err_msg) + self.nvbootctrl_conf = nvbootctrl_conf_fpath.read_text() + logger.info(f"nvboot_ctrl_conf: \n{self.nvbootctrl_conf}") + # ------ check if unified A/B is enabled ------ # # NOTE: mismatch rootfs BSP version and bootloader firmware BSP version # is NOT supported and MUST not occur. - unified_ab_enabled = False - if bsp_version >= (32, 6, 0): - # NOTE: unified A/B is supported starting from r32.6 - unified_ab_enabled = _NVBootctrl.is_unified_enabled() - if unified_ab_enabled is None: - _err_msg = "rootfs A/B is not enabled!" - logger.error(_err_msg) - raise JetsonCBootContrlError(_err_msg) - else: # R32.5 and below doesn't support unified A/B - try: - _NVBootctrl.get_current_slot(target="rootfs") - except subprocess.CalledProcessError: - _err_msg = "rootfs A/B is not enabled!" - logger.error(_err_msg) - raise JetsonCBootContrlError(_err_msg) from None - self.unified_ab_enabled = unified_ab_enabled - - if unified_ab_enabled: - logger.info( - "unified A/B is enabled, rootfs and bootloader will be switched together" - ) + self.unified_ab_enabled = False + if rootfs_bsp_version >= BSPVersion(32, 6, 1): + if unified_ab_enabled := NVBootctrlJetsonCBOOT.is_unified_enabled(): + logger.info( + "unified A/B is enabled, rootfs and bootloader will be switched together" + ) + self.unified_ab_enabled = unified_ab_enabled # ------ check A/B slots ------ # - self.current_bootloader_slot = current_bootloader_slot = ( - _NVBootctrl.get_current_slot() - ) - self.standby_bootloader_slot = standby_bootloader_slot = ( - _NVBootctrl.get_standby_slot() - ) - if not unified_ab_enabled: - self.current_rootfs_slot = current_rootfs_slot = ( - _NVBootctrl.get_current_slot(target="rootfs") + try: + self.current_bootloader_slot = current_bootloader_slot = ( + NVBootctrlJetsonCBOOT.get_current_slot() ) - self.standby_rootfs_slot = standby_rootfs_slot = ( - _NVBootctrl.get_standby_slot(target="rootfs") + self.standby_bootloader_slot = standby_bootloader_slot = ( + NVBootctrlJetsonCBOOT.get_standby_slot() ) - else: - self.current_rootfs_slot = current_rootfs_slot = current_bootloader_slot - self.standby_rootfs_slot = standby_rootfs_slot = standby_bootloader_slot + if not unified_ab_enabled: + self.current_rootfs_slot = current_rootfs_slot = ( + NVBootctrlJetsonCBOOT.get_current_slot(target="rootfs") + ) + self.standby_rootfs_slot = standby_rootfs_slot = ( + NVBootctrlJetsonCBOOT.get_standby_slot(target="rootfs") + ) + else: + self.current_rootfs_slot = current_rootfs_slot = current_bootloader_slot + self.standby_rootfs_slot = standby_rootfs_slot = standby_bootloader_slot + except NVBootctrlExecError as e: + _err_msg = f"failed to detect slot info: {e!r}" + logger.error(_err_msg) + raise JetsonCBootContrlError(_err_msg) from e # check if rootfs slot and bootloader slot mismatches, this only happens # when unified_ab is not enabled. @@ -256,37 +357,36 @@ def __init__(self): logger.warning("this might indicates a failed previous firmware update") # ------ detect rootfs_dev and parent_dev ------ # - self.curent_rootfs_devpath = current_rootfs_devpath = ( - CMDHelperFuncs.get_current_rootfs_dev() - ) - self.parent_devpath = parent_devpath = Path( - CMDHelperFuncs.get_parent_dev(current_rootfs_devpath) - ) - - self._external_rootfs = False - parent_devname = parent_devpath.name - if parent_devname.startswith(boot_cfg.MMCBLK_DEV_PREFIX): - logger.info(f"device boots from internal emmc: {parent_devpath}") - elif parent_devname.startswith(boot_cfg.NVMESSD_DEV_PREFIX): - logger.info(f"device boots from external nvme ssd: {parent_devpath}") - self._external_rootfs = True - else: - _err_msg = f"we don't support boot from {parent_devpath=} currently" - logger.error(_err_msg) - raise JetsonCBootContrlError(_err_msg) from NotImplementedError( - f"unsupported bootdev {parent_devpath}" + try: + self.curent_rootfs_devpath = current_rootfs_devpath = ( + CMDHelperFuncs.get_current_rootfs_dev() + ) + self.parent_devpath = parent_devpath = Path( + CMDHelperFuncs.get_parent_dev(current_rootfs_devpath) ) + except Exception as e: + _err_msg = f"failed to detect rootfs: {e!r}" + logger.error(_err_msg) + raise JetsonCBootContrlError(_err_msg) from e + + self._external_rootfs = detect_external_rootdev(parent_devpath) # rootfs partition - self.standby_rootfs_devpath = ( - f"/dev/{parent_devname}p{self._slot_id_partid[standby_rootfs_slot]}" - ) - self.standby_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( - "PARTUUID", f"{self.standby_rootfs_devpath}" - ) - current_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( - "PARTUUID", current_rootfs_devpath - ) + try: + self.standby_rootfs_devpath = get_partition_devpath( + parent_devpath=parent_devpath, + partition_id=SLOT_PAR_MAP[standby_rootfs_slot], + ) + self.standby_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( + "PARTUUID", self.standby_rootfs_devpath + ).strip() + current_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( + "PARTUUID", current_rootfs_devpath + ).strip() + except Exception as e: + _err_msg = f"failed to detect rootfs dev partuuid: {e!r}" + logger.error(_err_msg) + raise JetsonCBootContrlError(_err_msg) from e logger.info( "finish detecting rootfs devs: \n" @@ -295,13 +395,17 @@ def __init__(self): ) # internal emmc partition - self.standby_internal_emmc_devpath = f"/dev/{boot_cfg.INTERNAL_EMMC_DEVNAME}p{self._slot_id_partid[standby_rootfs_slot]}" - + self.standby_internal_emmc_devpath = get_partition_devpath( + parent_devpath=f"/dev/{boot_cfg.INTERNAL_EMMC_DEVNAME}", + partition_id=SLOT_PAR_MAP[standby_rootfs_slot], + ) logger.info(f"finished cboot control init: {current_rootfs_slot=}") - logger.info(f"nvbootctrl dump-slots-info: \n{_NVBootctrl.dump_slots_info()}") + logger.info( + f"nvbootctrl dump-slots-info: \n{NVBootctrlJetsonCBOOT.dump_slots_info()}" + ) if not unified_ab_enabled: logger.info( - f"nvbootctrl -t rootfs dump-slots-info: \n{_NVBootctrl.dump_slots_info(target='rootfs')}" + f"nvbootctrl -t rootfs dump-slots-info: \n{NVBootctrlJetsonCBOOT.dump_slots_info(target='rootfs')}" ) # API @@ -316,7 +420,9 @@ def external_rootfs_enabled(self) -> bool: return self._external_rootfs def set_standby_rootfs_unbootable(self): - _NVBootctrl.set_slot_as_unbootable(self.standby_rootfs_slot, target="rootfs") + NVBootctrlJetsonCBOOT.set_slot_as_unbootable( + self.standby_rootfs_slot, target="rootfs" + ) def switch_boot_to_standby(self) -> None: # NOTE(20240412): we always try to align bootloader slot with rootfs. @@ -324,11 +430,14 @@ def switch_boot_to_standby(self) -> None: logger.info(f"switch boot to standby slot({target_slot})") if not self.unified_ab_enabled: - _NVBootctrl.set_active_boot_slot(target_slot, target="rootfs") + logger.info( + f"unified AB slot is not enabled, also set active rootfs slot to ${target_slot}" + ) + NVBootctrlJetsonCBOOT.set_active_boot_slot(target_slot, target="rootfs") # when unified_ab enabled, switching bootloader slot will also switch # the rootfs slot. - _NVBootctrl.set_active_boot_slot(target_slot) + NVBootctrlJetsonCBOOT.set_active_boot_slot(target_slot) class JetsonCBootControl(BootControllerProtocol): @@ -337,7 +446,7 @@ class JetsonCBootControl(BootControllerProtocol): def __init__(self) -> None: try: # startup boot controller - self._cboot_control = _CBootControl() + self._cboot_control = cboot_control = _CBootControl() # mount point prepare self._mp_control = SlotMountHelper( @@ -346,20 +455,16 @@ def __init__(self) -> None: active_slot_dev=self._cboot_control.curent_rootfs_devpath, active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT, ) - current_ota_status_dir = Path(boot_cfg.OTA_STATUS_DIR) - standby_ota_status_dir = self._mp_control.standby_slot_mount_point / Path( - boot_cfg.OTA_STATUS_DIR - ).relative_to("/") - - # load firmware BSP version from current rootfs slot - self._firmware_ver_control = FirmwareBSPVersionControl( - current_firmware_bsp_vf=current_ota_status_dir - / boot_cfg.FIRMWARE_BSP_VERSION_FNAME, - standby_firmware_bsp_vf=standby_ota_status_dir - / boot_cfg.FIRMWARE_BSP_VERSION_FNAME, - ) # init ota-status files + current_ota_status_dir = Path(boot_cfg.OTA_STATUS_DIR) + standby_ota_status_dir = Path( + replace_root( + boot_cfg.OTA_STATUS_DIR, + "/", + cfg.MOUNT_POINT, + ) + ) self._ota_status_control = OTAStatusFilesControl( active_slot=str(self._cboot_control.current_rootfs_slot), standby_slot=str(self._cboot_control.standby_rootfs_slot), @@ -368,6 +473,26 @@ def __init__(self) -> None: standby_ota_status_dir=standby_ota_status_dir, finalize_switching_boot=self._finalize_switching_boot, ) + + # load firmware BSP version + current_fw_bsp_ver_fpath = ( + current_ota_status_dir / boot_cfg.FIRMWARE_BSP_VERSION_FNAME + ) + self._firmware_bsp_ver_control = bsp_ver_ctrl = FirmwareBSPVersionControl( + current_slot=cboot_control.current_bootloader_slot, + # NOTE: see comments at L240-242 + current_slot_bsp_ver=cboot_control.rootfs_bsp_version, + current_bsp_version_file=current_fw_bsp_ver_fpath, + ) + # always update the bsp_version_file on startup to reflect + # the up-to-date current slot BSP version + self._firmware_bsp_ver_control.write_to_file(current_fw_bsp_ver_fpath) + logger.info( + f"\ncurrent slot firmware BSP version: {bsp_ver_ctrl.current_slot_bsp_ver}\n" + f"standby slot firmware BSP version: {bsp_ver_ctrl.standby_slot_bsp_ver}" + ) + + logger.info("jetson-cboot boot control start up finished") except Exception as e: _err_msg = f"failed to start jetson-cboot controller: {e!r}" raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e @@ -379,7 +504,6 @@ def _finalize_switching_boot(self) -> bool: Also if unified A/B is NOT enabled and everything is alright, execute mark-boot-success to mark the current booted rootfs boots successfully. """ - current_boot_slot = self._cboot_control.current_bootloader_slot current_rootfs_slot = self._cboot_control.current_rootfs_slot update_result = NVUpdateEngine.verify_update() @@ -391,98 +515,64 @@ def _finalize_switching_boot(self) -> bool: "failing the OTA and clear firmware version due to new bootloader slot boot failed." ) logger.error(_err_msg) - - # NOTE: always only change current slots firmware_bsp_version file here. - self._firmware_ver_control.set_version_by_slot(current_boot_slot, None) - self._firmware_ver_control.write_current_firmware_bsp_version() return False # NOTE(20240417): rootfs slot is manually switched by set-active-boot-slot, # so we need to manually set the slot as success after first reboot. if not self._cboot_control.unified_ab_enabled: - _NVBootctrl.mark_boot_successful(current_rootfs_slot, target="rootfs") + NVBootctrlJetsonCBOOT.mark_boot_successful( + current_rootfs_slot, target="rootfs" + ) logger.info( f"nv_update_engine verify succeeded: \n{update_result.stdout.decode()}" ) return True - def _nv_firmware_update(self) -> Optional[bool]: - """Perform firmware update with nv_update_engine. + def _firmware_update(self) -> bool | None: + """Perform firmware update with nv_update_engine if needed. Returns: True if firmware update applied, False for failed firmware update, None for no firmware update occurs. """ logger.info("jetson-cboot: entering nv firmware update ...") - standby_bootloader_slot = self._cboot_control.standby_bootloader_slot - standby_firmware_bsp_ver = self._firmware_ver_control.get_version_by_slot( - standby_bootloader_slot - ) - logger.info(f"{standby_bootloader_slot=} BSP ver: {standby_firmware_bsp_ver}") - - # ------ check if we need to do firmware update ------ # - _new_bsp_v_fpath = self._mp_control.standby_slot_mount_point / Path( - boot_cfg.NV_TEGRA_RELEASE_FPATH - ).relative_to("/") - try: - new_bsp_v = parse_bsp_version(_new_bsp_v_fpath.read_text()) - except Exception as e: - logger.warning(f"failed to detect new image's BSP version: {e!r}") - logger.warning("skip firmware update due to new image BSP version unknown") + tnspec = get_nvbootctrl_conf_tnspec(self._cboot_control.nvbootctrl_conf) + if not tnspec: + logger.warning("tnspec is not defined, skip firmware update!") return - logger.info(f"BUP package version: {new_bsp_v=}") - if standby_firmware_bsp_ver and standby_firmware_bsp_ver >= new_bsp_v: - logger.info( - f"{standby_bootloader_slot=} has newer or equal ver of firmware, skip firmware update" - ) + firmware_package_meta = load_firmware_package( + firmware_update_request_fpath=replace_root( + boot_cfg.FIRMWARE_UPDATE_REQUEST_FPATH, + "/", + self._mp_control.standby_slot_mount_point, + ), + firmware_manifest_fpath=replace_root( + boot_cfg.FIRMWARE_MANIFEST_FPATH, + "/", + self._mp_control.standby_slot_mount_point, + ), + ) + if firmware_package_meta is None: + logger.info("skip firmware update ...") return + firmware_update_request, firmware_manifest = firmware_package_meta # ------ preform firmware update ------ # - firmware_dpath = self._mp_control.standby_slot_mount_point / Path( - boot_cfg.FIRMWARE_DPATH - ).relative_to("/") - - _firmware_applied = False - for firmware_fname in boot_cfg.FIRMWARE_LIST: - if (firmware_fpath := firmware_dpath / firmware_fname).is_file(): - logger.info(f"nv_firmware: apply {firmware_fpath} ...") - try: - NVUpdateEngine.apply_firmware_update( - firmware_fpath, - unified_ab=bool(self._cboot_control.unified_ab_enabled), - ) - _firmware_applied = True - except subprocess.CalledProcessError as e: - _err_msg = ( - f"failed to apply BUP {firmware_fpath}: {e!r}, \n" - f"stderr={e.stderr.decode()}, \n" - f"stdout={e.stdout.decode()}" - ) - logger.error(_err_msg) - logger.warning("firmware update interrupted, failing OTA...") - - # if the firmware update is interrupted halfway(some of the BUP is applied), - # revert bootloader slot switch - if _firmware_applied: - logger.warning( - "revert bootloader slot switch to current active slot" - ) - _NVBootctrl.set_active_boot_slot( - self._cboot_control.current_bootloader_slot - ) - return False - - # ------ register new firmware version ------ # - if _firmware_applied: - logger.info( - f"nv_firmware: successfully apply firmware to {self._cboot_control.standby_rootfs_slot=}" - ) - self._firmware_ver_control.set_version_by_slot( - standby_bootloader_slot, new_bsp_v - ) - return True + fw_update_bsp_ver = BSPVersion.parse( + firmware_manifest.firmware_spec.bsp_version + ) + logger.info(f"firmware update package BSP version: {fw_update_bsp_ver}") + + firmware_updater = NVUpdateEngine( + standby_slot_mp=self._mp_control.standby_slot_mount_point, + tnspec=tnspec, + firmware_update_request=firmware_update_request, + firmware_manifest=firmware_manifest, + unify_ab=bool(self._cboot_control.unified_ab_enabled), + ) + return firmware_updater.firmware_update() # APIs @@ -501,6 +591,9 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool) if not self._cboot_control.unified_ab_enabled: # set standby rootfs as unbootable as we are going to update it # this operation not applicable when unified A/B is enabled. + logger.info( + "unified AB slot is not enabled, set standby rootfs slot as unbootable" + ) self._cboot_control.set_standby_rootfs_unbootable() # prepare standby slot dev @@ -530,16 +623,20 @@ def post_update(self) -> Generator[None, None, None]: ) # ------ firmware update ------ # - firmware_update_result = self._nv_firmware_update() - if firmware_update_result: - self._firmware_ver_control.write_current_firmware_bsp_version() - elif firmware_update_result is None: + firmware_update_result = self._firmware_update() + if firmware_update_result is None: logger.info("no firmware update occurs") - else: + elif firmware_update_result is False: raise JetsonCBootContrlError("firmware update failed") + else: + logger.info("new firmware is written to the standby slot") # ------ preserve BSP version files to standby slot ------ # - self._firmware_ver_control.write_standby_firmware_bsp_version() + standby_fw_bsp_ver_fpath = ( + self._ota_status_control.standby_ota_status_dir + / boot_cfg.FIRMWARE_BSP_VERSION_FNAME + ) + self._firmware_bsp_ver_control.write_to_file(standby_fw_bsp_ver_fpath) # ------ preserve /boot/ota folder to standby rootfs ------ # preserve_ota_config_files_to_standby( @@ -573,7 +670,7 @@ def post_update(self) -> Generator[None, None, None]: # ------ prepare to reboot ------ # self._mp_control.umount_all(ignore_error=True) - logger.info(f"[post-update]: \n{_NVBootctrl.dump_slots_info()}") + logger.info(f"[post-update]: \n{NVBootctrlJetsonCBOOT.dump_slots_info()}") logger.info("post update finished, wait for reboot ...") yield # hand over control back to otaclient CMDHelperFuncs.reboot() diff --git a/src/otaclient/boot_control/_jetson_common.py b/src/otaclient/boot_control/_jetson_common.py index 5971c3692..cf13e06e6 100644 --- a/src/otaclient/boot_control/_jetson_common.py +++ b/src/otaclient/boot_control/_jetson_common.py @@ -29,9 +29,12 @@ from pydantic import BaseModel, BeforeValidator, PlainSerializer from typing_extensions import Annotated, Literal, Self +from otaclient_common import replace_root from otaclient_common.common import copytree_identical, write_str_to_file_sync +from otaclient_common.typing import StrOrPath from ._common import CMDHelperFuncs +from .configs import jetson_common_cfg logger = logging.getLogger(__name__) @@ -39,19 +42,32 @@ class SlotID(str): """slot_id for A/B slots. - On NVIDIA Jetson device, slot_a has slot_id=0, slot_b has slot_id=1. - For slot_a, the slot partition name suffix is "" or "_a". - For slot_b, the slot partition name suffix is "_b". + slot_a has slot_id=0, slot_b has slot_id=1. """ VALID_SLOTS = ["0", "1"] + VALID_SLOTS_CHAR = ["A", "B"] def __new__(cls, _in: str | Self) -> Self: if isinstance(_in, cls): return _in if _in in cls.VALID_SLOTS: return str.__new__(cls, _in) - raise ValueError(f"{_in=} is not valid slot num, should be '0' or '1'.") + if _in in cls.VALID_SLOTS_CHAR: + return str.__new__(cls, "0") if _in == "A" else str.__new__(cls, "1") + raise ValueError( + f"{_in=} is not valid slot num, should be '0'('A') or '1'('B')." + ) + + +SLOT_A, SLOT_B = SlotID("0"), SlotID("1") +SLOT_FLIP = {SLOT_A: SLOT_B, SLOT_B: SLOT_A} +SLOT_PAR_MAP = {SLOT_A: 1, SLOT_B: 2} +"""SLOT_A: 1, SLOT_B: 2""" + +BSP_VERSION_STR_PA = re.compile( + r"[rR]?(?P\d+)\.(?P\d+)\.(?P\d+)" +) class BSPVersion(NamedTuple): @@ -66,18 +82,29 @@ class BSPVersion(NamedTuple): @classmethod def parse(cls, _in: str | BSPVersion | Any) -> Self: - """Parse "Rxx.yy.z string into BSPVersion.""" + """Parse BSP version string into BSPVersion. + + Raises: + ValueError on invalid input. + """ if isinstance(_in, cls): return _in if isinstance(_in, str): - major_ver, major_rev, minor_rev = _in[1:].split(".") + ma = BSP_VERSION_STR_PA.match(_in) + if not ma: + raise ValueError(f"not a valid bsp version string: {_in}") + + major_ver, major_rev, minor_rev = ( + ma.group("major_ver"), + ma.group("major_rev"), + ma.group("minor_rev"), + ) return cls(int(major_ver), int(major_rev), int(minor_rev)) raise ValueError(f"expect str or BSPVersion instance, get {type(_in)}") - @staticmethod - def dump(to_export: BSPVersion) -> str: + def dump(self) -> str: """Dump BSPVersion to string as "Rxx.yy.z".""" - return f"R{to_export.major_ver}.{to_export.major_rev}.{to_export.minor_rev}" + return f"R{self.major_ver}.{self.major_rev}.{self.minor_rev}" BSPVersionStr = Annotated[ @@ -85,10 +112,10 @@ def dump(to_export: BSPVersion) -> str: BeforeValidator(BSPVersion.parse), PlainSerializer(BSPVersion.dump, return_type=str), ] -"""BSPVersion in string representation, used by FirmwareBSPVersion model.""" +"""BSPVersion in string representation.""" -class FirmwareBSPVersion(BaseModel): +class SlotBSPVersion(BaseModel): """ BSP version string schema: Rxx.yy.z """ @@ -96,10 +123,30 @@ class FirmwareBSPVersion(BaseModel): slot_a: Optional[BSPVersionStr] = None slot_b: Optional[BSPVersionStr] = None + def set_by_slot(self, slot_id: SlotID, ver: BSPVersion | None) -> None: + if slot_id == SLOT_A: + self.slot_a = ver + elif slot_id == SLOT_B: + self.slot_b = ver + else: + raise ValueError(f"invalid slot_id: {slot_id}") + + def get_by_slot(self, slot_id: SlotID) -> BSPVersion | None: + if slot_id == SLOT_A: + return self.slot_a + elif slot_id == SLOT_B: + return self.slot_b + else: + raise ValueError(f"invalid slot_id: {slot_id}") + NVBootctrlTarget = Literal["bootloader", "rootfs"] +class NVBootctrlExecError(Exception): + """Raised when nvbootctrl command execution failed.""" + + class NVBootctrlCommon: """Helper for calling nvbootctrl commands. @@ -124,9 +171,13 @@ def _nvbootctrl( _cmd: str, _slot_id: Optional[SlotID] = None, *, - check_output, + check_output: bool, target: Optional[NVBootctrlTarget] = None, - ) -> Any: + ) -> Any: # pragma: no cover + """ + Raises: + CalledProcessError on return code not equal to 0. + """ cmd = [cls.NVBOOTCTRL] if target: cmd.extend(["-t", target]) @@ -141,93 +192,134 @@ def _nvbootctrl( ) if check_output: return res.stdout.decode() - return @classmethod - def get_current_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: - """Prints currently running SLOT.""" + def get_current_slot( + cls, *, target: Optional[NVBootctrlTarget] = None + ) -> SlotID: # pragma: no cover + """Prints currently running SLOT. + + Raises: + NVBootctrlExecError on failed to get current slot. + """ cmd = "get-current-slot" - res = cls._nvbootctrl(cmd, check_output=True, target=target) - assert isinstance(res, str), f"invalid output from get-current-slot: {res}" - return SlotID(res.strip()) + try: + res = cls._nvbootctrl(cmd, check_output=True, target=target) + return SlotID(res.strip()) + except Exception as e: + raise NVBootctrlExecError from e @classmethod - def get_standby_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotID: + def get_standby_slot( + cls, *, target: Optional[NVBootctrlTarget] = None + ) -> SlotID: # pragma: no cover """Prints standby SLOT. NOTE: this method is implemented with nvbootctrl get-current-slot. + + Raises: + NVBootctrlExecError on failed to get current slot. """ current_slot = cls.get_current_slot(target=target) - return SlotID("0") if current_slot == "1" else SlotID("1") + return SLOT_FLIP[current_slot] @classmethod def set_active_boot_slot( cls, slot_id: SlotID, *, target: Optional[NVBootctrlTarget] = None - ) -> None: - """On next boot, load and execute SLOT.""" + ) -> None: # pragma: no cover + """On next boot, load and execute SLOT. + + Raises: + NVBootctrlExecError on nvbootctrl call failed. + """ cmd = "set-active-boot-slot" - return cls._nvbootctrl(cmd, SlotID(slot_id), check_output=False, target=target) + try: + return cls._nvbootctrl( + cmd, SlotID(slot_id), check_output=False, target=target + ) + except subprocess.CalledProcessError as e: + raise NVBootctrlExecError from e @classmethod - def dump_slots_info(cls, *, target: Optional[NVBootctrlTarget] = None) -> str: - """Prints info for slots.""" + def dump_slots_info( + cls, *, target: Optional[NVBootctrlTarget] = None + ) -> str: # pragma: no cover + """Prints info for slots. + + Raises: + NVBootctrlExecError on nvbootctrl call failed. + """ cmd = "dump-slots-info" - return cls._nvbootctrl(cmd, target=target, check_output=True) + try: + return cls._nvbootctrl(cmd, target=target, check_output=True) + except subprocess.CalledProcessError as e: + raise NVBootctrlExecError from e class FirmwareBSPVersionControl: - """firmware_bsp_version ota-status file for tracking firmware version. + """firmware_bsp_version ota-status file for tracking BSP version. - The firmware BSP version is stored in /boot/ota-status/firmware_bsp_version json file, + The BSP version is stored in /boot/ota-status/firmware_bsp_version json file, tracking the firmware BSP version for each slot. + NOTE that we only cares about firmware BSP version. + + Unfortunately, we don't have method to detect standby slot's firwmare vesrion, + when the firmware_bsp_version file is not presented(typical case when we have newly setup device), + we ASSUME that both slots are running the same BSP version of firmware. Each slot should keep the same firmware_bsp_version file, this file is passed to standby slot - during OTA update. + during OTA update as it. """ def __init__( - self, current_firmware_bsp_vf: Path, standby_firmware_bsp_vf: Path + self, + current_slot: SlotID, + current_slot_bsp_ver: BSPVersion, + *, + current_bsp_version_file: Path, ) -> None: - self._current_fw_bsp_vf = current_firmware_bsp_vf - self._standby_fw_bsp_vf = standby_firmware_bsp_vf + self.current_slot, self.standby_slot = current_slot, SLOT_FLIP[current_slot] - self._version = FirmwareBSPVersion() + self._version = SlotBSPVersion() try: - self._version = _version = FirmwareBSPVersion.model_validate_json( - self._current_fw_bsp_vf.read_text() + self._version = SlotBSPVersion.model_validate_json( + current_bsp_version_file.read_text() ) - logger.info(f"firmware_version: {_version}") except Exception as e: + logger.warning(f"invalid or missing bsp_verion file: {e!r}") + current_bsp_version_file.unlink(missing_ok=True) logger.warning( - f"invalid or missing firmware_bsp_verion file, removed: {e!r}" + "assume standby slot is running the same version of firmware" ) - self._current_fw_bsp_vf.unlink(missing_ok=True) + self._version.set_by_slot(self.standby_slot, current_slot_bsp_ver) - def write_current_firmware_bsp_version(self) -> None: - """Write firmware_bsp_version from memory to firmware_bsp_version file.""" - write_str_to_file_sync(self._current_fw_bsp_vf, self._version.model_dump_json()) + # NOTE: only check the standby slot's firmware BSP version info from file, + # for current slot, always trust the value from nvbootctrl. + self._version.set_by_slot(current_slot, current_slot_bsp_ver) - def write_standby_firmware_bsp_version(self) -> None: + def write_to_file(self, fw_bsp_fpath: StrOrPath) -> None: """Write firmware_bsp_version from memory to firmware_bsp_version file.""" - write_str_to_file_sync(self._standby_fw_bsp_vf, self._version.model_dump_json()) + write_str_to_file_sync(fw_bsp_fpath, self._version.model_dump_json()) - def get_version_by_slot(self, slot_id: SlotID) -> Optional[BSPVersion]: - """Get slot's firmware version from memory.""" - if slot_id == "0": - return self._version.slot_a - return self._version.slot_b + @property + def current_slot_bsp_ver(self) -> BSPVersion: + assert (res := self._version.get_by_slot(self.current_slot)) + return res - def set_version_by_slot( - self, slot_id: SlotID, version: Optional[BSPVersion] - ) -> None: - """Set slot's firmware version into memory.""" - if slot_id == "0": - self._version.slot_a = version - else: - self._version.slot_b = version + @current_slot_bsp_ver.setter + def current_slot_bsp_ver(self, bsp_ver: BSPVersion | None): + self._version.set_by_slot(self.current_slot, bsp_ver) + + @property + def standby_slot_bsp_ver(self) -> BSPVersion | None: + return self._version.get_by_slot(self.standby_slot) + @standby_slot_bsp_ver.setter + def standby_slot_bsp_ver(self, bsp_ver: BSPVersion | None): + self._version.set_by_slot(self.standby_slot, bsp_ver) -BSP_VER_PA = re.compile( + +NV_TEGRA_RELEASE_PA = re.compile( ( r"# R(?P\d+) \(\w+\), REVISION: (?P\d+)\.(?P\d+), " r"GCID: (?P\d+), BOARD: (?P\w+), EABI: (?P\w+)" @@ -236,12 +328,12 @@ def set_version_by_slot( """Example: # R32 (release), REVISION: 6.1, GCID: 27863751, BOARD: t186ref, EABI: aarch64, DATE: Mon Jul 26 19:36:31 UTC 2021 """ -def parse_bsp_version(nv_tegra_release: str) -> BSPVersion: +def parse_nv_tegra_release(nv_tegra_release: str) -> BSPVersion: """Parse BSP version from contents of /etc/nv_tegra_release. see https://developer.nvidia.com/embedded/jetson-linux-archive for BSP version history. """ - ma = BSP_VER_PA.match(nv_tegra_release) + ma = NV_TEGRA_RELEASE_PA.match(nv_tegra_release) assert ma, f"invalid nv_tegra_release content: {nv_tegra_release}" return BSPVersion( int(ma.group("major_ver")), @@ -250,6 +342,36 @@ def parse_bsp_version(nv_tegra_release: str) -> BSPVersion: ) +def detect_rootfs_bsp_version(rootfs: StrOrPath) -> BSPVersion: + """Detect rootfs BSP version on . + + Raises: + ValueError on failed detection. + + Returns: + BSPversion of the . + """ + nv_tegra_release_fpath = replace_root( + jetson_common_cfg.NV_TEGRA_RELEASE_FPATH, + "/", + rootfs, + ) + try: + return parse_nv_tegra_release(Path(nv_tegra_release_fpath).read_text()) + except Exception as e: + _err_msg = f"failed to detect rootfs BSP version at: {rootfs}: {e!r}" + logger.error(_err_msg) + raise ValueError(_err_msg) from e + + +def get_nvbootctrl_conf_tnspec(nvbootctrl_conf: str) -> str | None: + """Get the TNSPEC field from nv_boot_control conf file.""" + for line in nvbootctrl_conf.splitlines(): + if line.strip().startswith("TNSPEC"): + _, tnspec = line.split(" ", maxsplit=1) + return tnspec.strip() + + def update_extlinux_cfg(_input: str, partuuid: str) -> str: """Update input exlinux text with input rootfs .""" partuuid_str = f"PARTUUID={partuuid}" @@ -270,9 +392,9 @@ def _replace(ma: re.Match, repl: str): def copy_standby_slot_boot_to_internal_emmc( *, - internal_emmc_mp: Path | str, - internal_emmc_devpath: Path | str, - standby_slot_boot_dirpath: Path | str, + internal_emmc_mp: StrOrPath, + internal_emmc_devpath: StrOrPath, + standby_slot_boot_dirpath: StrOrPath, ) -> None: """Copy the standby slot's /boot to internal emmc dev. @@ -318,7 +440,6 @@ def preserve_ota_config_files_to_standby( f"{active_slot_ota_dirpath} doesn't exist, skip preserve /boot/ota folder." ) return - # TODO: (20240411) reconsidering should we preserve /boot/ota? copytree_identical(active_slot_ota_dirpath, standby_slot_ota_dirpath) @@ -345,3 +466,42 @@ def update_standby_slot_extlinux_cfg( standby_slot_partuuid, ), ) + + +def detect_external_rootdev(parent_devpath: StrOrPath) -> bool: + """Check whether the ECU is using external device as root device or not. + + Returns: + True if device is booted from external NVMe SSD, False if device is booted + from internal emmc device. + """ + parent_devname = Path(parent_devpath).name + if parent_devname.startswith(jetson_common_cfg.INTERNAL_EMMC_DEVNAME): + logger.info(f"device boots from internal emmc: {parent_devpath}") + return False + logger.info(f"device boots from external device: {parent_devpath}") + return True + + +def get_partition_devpath(parent_devpath: StrOrPath, partition_id: int) -> str: + """Get partition devpath from and . + + For internal emmc like /dev/mmcblk0 with partition_id 1, we will get: + /dev/mmcblk0p1 + For external NVMe SSD like /dev/nvme0n1 with partition_id 1, we will get: + /dev/nvme0n1p1 + For other types of device, including USB drive, like /dev/sda with partition_id 1, + we will get: /dev/sda1 + """ + parent_devpath = str(parent_devpath).strip().rstrip("/") + + parent_devname = Path(parent_devpath).name + if parent_devname.startswith( + jetson_common_cfg.MMCBLK_DEV_PREFIX + ) or parent_devname.startswith(jetson_common_cfg.NVMESSD_DEV_PREFIX): + return f"{parent_devpath}p{partition_id}" + if parent_devname.startswith(jetson_common_cfg.SDX_DEV_PREFIX): + return f"{parent_devpath}{partition_id}" + + logger.warning(f"unexpected {parent_devname=}, treat it the same as sdx type") + return f"{parent_devpath}{partition_id}" diff --git a/src/otaclient/boot_control/_jetson_uefi.py b/src/otaclient/boot_control/_jetson_uefi.py new file mode 100644 index 000000000..43c8f2f2d --- /dev/null +++ b/src/otaclient/boot_control/_jetson_uefi.py @@ -0,0 +1,1112 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Boot control implementation for NVIDIA Jetson device boots with UEFI. + +jetson-uefi module currently support BSP version >= R34(which UEFI is introduced). +But firmware update is only supported after BSP R35.2. +""" + + +from __future__ import annotations + +import contextlib +import logging +import os +import re +import shutil +import subprocess +from pathlib import Path +from typing import Any, ClassVar, Generator, Literal + +from pydantic import BaseModel +from typing_extensions import Self + +from otaclient.app import errors as ota_errors +from otaclient.app.configs import config as cfg +from otaclient.boot_control._firmware_package import ( + FirmwareManifest, + FirmwareUpdateRequest, + PayloadType, + load_firmware_package, +) +from otaclient_api.v2 import types as api_types +from otaclient_common import replace_root +from otaclient_common.common import ( + file_digest, + file_sha256, + subprocess_call, + write_str_to_file_sync, +) +from otaclient_common.typing import StrOrPath + +from ._common import CMDHelperFuncs, OTAStatusFilesControl, SlotMountHelper +from ._jetson_common import ( + SLOT_PAR_MAP, + BSPVersion, + FirmwareBSPVersionControl, + NVBootctrlCommon, + NVBootctrlExecError, + SlotID, + copy_standby_slot_boot_to_internal_emmc, + detect_external_rootdev, + detect_rootfs_bsp_version, + get_nvbootctrl_conf_tnspec, + get_partition_devpath, + preserve_ota_config_files_to_standby, + update_standby_slot_extlinux_cfg, +) +from .configs import jetson_uefi_cfg as boot_cfg +from .protocol import BootControllerProtocol + +logger = logging.getLogger(__name__) + +MINIMUM_SUPPORTED_BSP_VERSION = BSPVersion(34, 0, 0) +"""Only after R34, UEFI is introduced.""" +FIRMWARE_UPDATE_MINIMUM_SUPPORTED_BSP_VERSION = BSPVersion(35, 2, 0) +"""Only after R35.2, UEFI Capsule firmware update is introduced.""" + +L4TLAUNCHER_BSP_VER_SHA256_MAP: dict[str, BSPVersion] = { + "b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718": BSPVersion( + 36, 3, 0 + ), + "3928e0feb84e37db87dc6705a02eec95a6fca2dccd3467b61fa66ed8e1046b67": BSPVersion( + 35, 5, 0 + ), + "ac17457772666351154a5952e3b87851a6398da2afcf3a38bedfc0925760bb0e": BSPVersion( + 35, 4, 1 + ), +} + + +class JetsonUEFIBootControlError(Exception): # pragma: no cover + """Exception type for covering jetson-uefi related errors.""" + + +class NVBootctrlJetsonUEFI(NVBootctrlCommon): + """Helper for calling nvbootctrl commands, for platform using jetson-uefi. + + Typical output of nvbootctrl dump-slots-info: + + ``` + Current version: 35.4.1 + Capsule update status: 1 + Current bootloader slot: A + Active bootloader slot: A + num_slots: 2 + slot: 0, status: normal + slot: 1, status: normal + ``` + + For BSP version >= R34 with UEFI boot. + Without -t option, the target will be bootloader by default. + """ + + @classmethod + def get_current_fw_bsp_version(cls) -> BSPVersion: + """Get current boot chain's firmware BSP version with nvbootctrl. + + Raises: + NVBootctrlExecError if failed to detect fw bsp version, + or the reported version doesn't make sense. + """ + _raw = cls.dump_slots_info() + pa = re.compile(r"Current version:\s*(?P[\.\d]+)") + + if not (ma := pa.search(_raw)): + _err_msg = f"nvbootctrl reports invalid BSP version: \n{_raw=}" + logger.error(_err_msg) + raise NVBootctrlExecError(_err_msg) + + try: + bsp_ver_str = ma.group("bsp_ver") + bsp_ver = BSPVersion.parse(bsp_ver_str) + except ValueError as e: + raise NVBootctrlExecError(f"invalid bsp version: {e!r}") from e + + if bsp_ver.major_rev == 0: + _err_msg = f"invalid BSP version: {bsp_ver_str}, this might indicate an incomplete flash!" + logger.error(_err_msg) + raise NVBootctrlExecError(_err_msg) + return bsp_ver + + @classmethod + def get_active_bootloader_slot(cls) -> SlotID | None: + """Get the active bootloader slot. + + Returns: + SlotID of active bootloader slot, or None if failed to detect. + """ + _raw = cls.dump_slots_info() + pa = re.compile(r"Active bootloader slot:\s*(?P[AB])") + + if not (ma := pa.search(_raw)): + _err_msg = f"nvbootctrl report invalid active slot: \n{_raw=}" + logger.error(_err_msg) + return + + slot_id_char = ma.group("slot_id_char") + try: + return SlotID(slot_id_char) + except ValueError as e: + _err_msg = f"failed to detect active bootloader slot: {e!r}" + logger.error(_err_msg) + return + + @classmethod + def verify(cls) -> str | None: # pragma: no cover + """Verify the bootloader and rootfs boot.""" + try: + return cls._nvbootctrl("verify", check_output=True) + except subprocess.CalledProcessError as e: + logger.warning(f"nvbootctrl verify call failed: {e!r}") + return + + +EFIVARS_FSTYPE = "efivarfs" +EFIVARS_SYS_MOUNT_POINT = "/sys/firmware/efi/efivars/" + + +@contextlib.contextmanager +def _ensure_efivarfs_mounted() -> Generator[None, Any, None]: # pragma: no cover + """Ensure the efivarfs is mounted as rw, and then umount it.""" + if CMDHelperFuncs.is_target_mounted(EFIVARS_SYS_MOUNT_POINT): + options = "remount,rw,nosuid,nodev,noexec,relatime" + else: + logger.warning( + f"efivars is not mounted! try to mount it at {EFIVARS_SYS_MOUNT_POINT}" + ) + options = "rw,nosuid,nodev,noexec,relatime" + + # fmt: off + cmd = [ + "mount", + "-t", EFIVARS_FSTYPE, + "-o", options, + EFIVARS_FSTYPE, + EFIVARS_SYS_MOUNT_POINT + ] + # fmt: on + try: + subprocess_call(cmd, raise_exception=True) + yield + except Exception as e: + raise JetsonUEFIBootControlError( + f"failed to mount {EFIVARS_FSTYPE} on {EFIVARS_SYS_MOUNT_POINT}: {e!r}" + ) from e + + +def _trigger_capsule_update_qspi_ota_bootdev() -> bool: # pragma: no cover + """Write magic efivar to trigger firmware Capsule update in next boot. + + This method is for device using QSPI flash as TEGRA_OTA_BOOT_DEVICE. + """ + with _ensure_efivarfs_mounted(): + try: + magic_efivar_fpath = ( + Path(EFIVARS_SYS_MOUNT_POINT) / boot_cfg.UPDATE_TRIGGER_EFIVAR + ) + magic_efivar_fpath.write_bytes(boot_cfg.MAGIC_BYTES) + os.sync() + + return True + except Exception as e: + logger.warning( + ( + f"failed to configure capsule update by write magic value: {e!r}\n" + "firmware update might be skipped!" + ) + ) + return False + + +def _trigger_capsule_update_non_qspi_ota_bootdev( + esp_mp: StrOrPath, +) -> bool: # pragma: no cover + """Write magic efivar to trigger firmware Capsule update in next boot. + + Write a special file with magic value into specific location at internal emmc ESP partition. + + This method is for device NOT using QSPI flash as TEGRA_OTA_BOOT_DEVICE. + """ + uefi_variable_dir = Path(esp_mp) / "EFI/NVDA/Variables" + magic_variable_fpath = uefi_variable_dir / boot_cfg.UPDATE_TRIGGER_EFIVAR + + try: + emmc_esp_partition = _detect_esp_dev(f"/dev/{boot_cfg.INTERNAL_EMMC_DEVNAME}") + with _ensure_esp_mounted(emmc_esp_partition, esp_mp): + magic_variable_fpath.write_bytes(boot_cfg.MAGIC_BYTES) + os.sync() + + return True + except Exception as e: + logger.warning( + ( + f"failed to configure capsule update: {e!r}\n" + "firmware update might be skipped!" + ) + ) + return False + + +@contextlib.contextmanager +def _ensure_esp_mounted( + esp_dev: StrOrPath, mount_point: StrOrPath +) -> Generator[None, Any, None]: # pragma: no cover + """Mount the esp partition and then umount it.""" + mount_point = Path(mount_point) + mount_point.mkdir(exist_ok=True, parents=True) + + try: + CMDHelperFuncs.mount_rw(str(esp_dev), mount_point) + yield + CMDHelperFuncs.umount(mount_point, raise_exception=False) + except Exception as e: + _err_msg = f"failed to mount {esp_dev} to {mount_point}: {e!r}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) from e + + +def _detect_esp_dev(boot_parent_devpath: StrOrPath) -> str: + # NOTE: if boots from external, expects to have multiple esp parts, + # we need to get the one at our booted parent dev. + esp_parts = CMDHelperFuncs.get_dev_by_token( + token="PARTLABEL", value=boot_cfg.ESP_PARTLABEL + ) + if not esp_parts: + raise JetsonUEFIBootControlError("no ESP partition presented") + + for _esp_part in esp_parts: + if _esp_part.strip().startswith(str(boot_parent_devpath)): + logger.info(f"find esp partition at {_esp_part}") + esp_part = _esp_part + break + else: + _err_msg = f"failed to find esp partition on {boot_parent_devpath}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) + return esp_part + + +def _detect_ota_bootdev_is_qspi(nvboot_ctrl_conf: str) -> bool | None: + """Detect whether the device is using QSPI or not. + + This is required for determining the way to trigger UEFI Capsule update. + If the TEGRA_OTA_BOOT_DEVICE is mtdblock device, then the device is using QSPI. + If is mmcblk0bootx, then the device is using internal emmc. + + If we cannot determine the TEGRA_OTA_BOOT_DEVICE, we MUST NOT execute the firmware update. + + Returns: + True if device is using QSPI flash, False for device is using internal emmc, + None for unrecognized device or missing TEGRA_OTA_BOOT_DEVICE field in conf. + """ + for line in nvboot_ctrl_conf.splitlines(): + if line.strip().startswith("TEGRA_OTA_BOOT_DEVICE"): + _, _ota_bootdev = line.split(" ", maxsplit=1) + ota_bootdev = _ota_bootdev.strip() + break + else: + logger.warning("no TEGRA_OTA_BOOT_DEVICE field found in nv_boot_ctrl.conf") + return + + if ota_bootdev.startswith("/dev/mmcblk0"): + logger.info("device is using internal emmc flash as TEGRA_OTA_BOOT_DEVICE") + return False + + if ota_bootdev.startswith("/dev/mtdblock0"): + logger.info("device uses QSPI flash as TEGRA_OTA_BOOT_DEVICE") + return True + + logger.warning(f"device uses unknown TEGRA_OTA_BOOT_DEVICE: {ota_bootdev}") + + +class L4TLauncherBSPVersionControl(BaseModel): + """Track the L4TLauncher binary BSP version. + + Schema: : + """ + + bsp_ver: BSPVersion + sha256_digest: str + SEP: ClassVar[Literal[":"]] = ":" + + @classmethod + def parse(cls, _in: str) -> Self: + bsp_str, digest = _in.strip().split(":") + return cls(bsp_ver=BSPVersion.parse(bsp_str), sha256_digest=digest) + + def dump(self) -> str: + return f"{self.bsp_ver.dump()}{self.SEP}{self.sha256_digest}" + + +def _l4tlauncher_version_control( + l4tlauncher_ver_fpath: StrOrPath, + l4tlauncher_at_esp: StrOrPath, + *, + current_slot_bsp_ver: BSPVersion, +) -> BSPVersion: + """Try to determine the current in use l4tlauncher version and update ver control file if needed. + + If the version file is presented and the sha256digest matched, return the BSP version from version file. + + If sha256digest mismatched or version file missing: + 1. try to lookup the L4TLAUNCHER_BSP_VER_SHA256_MAP. + 2. assume that the launcher is the same version of current slot firmware BSP version. + Write new version file after new detecting. + + Returns: + The detected L4TLauncher BSP version. + """ + l4tlauncher_sha256_digest = file_sha256(l4tlauncher_at_esp) + l4tlauncher_ver_fpath = Path(l4tlauncher_ver_fpath) + + # try to determine the version with version file + try: + _ver_control = L4TLauncherBSPVersionControl.parse( + l4tlauncher_ver_fpath.read_text() + ) + if l4tlauncher_sha256_digest == _ver_control.sha256_digest: + return _ver_control.bsp_ver + + logger.warning( + ( + "l4tlauncher sha256 hash mismatched: " + f"{l4tlauncher_sha256_digest=}, {_ver_control.sha256_digest=}, " + "remove version file" + ) + ) + raise ValueError("sha256 hash mismatched") + except Exception as e: + logger.warning(f"missing or invalid l4tlauncher version file: {e!r}") + l4tlauncher_ver_fpath.unlink(missing_ok=True) + + # try to determine the version by looking up table + # NOTE(20240624): since the number of l4tlauncher version is limited, + # we can lookup against a pre-calculated sha256 digest map. + logger.info( + f"try to determine the l4tlauncher verison by hash: {l4tlauncher_sha256_digest}" + ) + if l4tlauncher_bsp_ver := L4TLAUNCHER_BSP_VER_SHA256_MAP.get( + l4tlauncher_sha256_digest + ): + _ver_control = L4TLauncherBSPVersionControl( + bsp_ver=l4tlauncher_bsp_ver, sha256_digest=l4tlauncher_sha256_digest + ) + write_str_to_file_sync(l4tlauncher_ver_fpath, _ver_control.dump()) + return l4tlauncher_bsp_ver + + # NOTE(20240624): if we failed to detect the l4tlauncher's version, + # we assume that the launcher is the same version as current slot's fw. + # This is typically the case of a newly flashed ECU. + logger.warning( + ( + "failed to determine the l4tlauncher's version, assuming " + f"version is the same as current slot's fw version: {current_slot_bsp_ver}" + ) + ) + _ver_control = L4TLauncherBSPVersionControl( + bsp_ver=current_slot_bsp_ver, sha256_digest=l4tlauncher_sha256_digest + ) + write_str_to_file_sync(l4tlauncher_ver_fpath, _ver_control.dump()) + return current_slot_bsp_ver + + +class UEFIFirmwareUpdater: + """Firmware update implementation using Capsule update.""" + + def __init__( + self, + boot_parent_devpath: StrOrPath, + standby_slot_mp: StrOrPath, + *, + nvbootctrl_conf: str, + fw_bsp_ver_control: FirmwareBSPVersionControl, + firmware_update_request: FirmwareUpdateRequest, + firmware_manifest: FirmwareManifest, + ) -> None: + """Init an instance of UEFIFirmwareUpdater. + + Args: + boot_parent_devpath (StrOrPath): The parent dev of current rootfs device. + standby_slot_mp (StrOrPath): The mount point of updated standby slot. The firmware update package + is located at /opt/ota_package folder. + nvbootctrl_conf (str): The contents of nv_boot_ctrl.conf file. + fw_bsp_ver_control (FirmwareBSPVersionControl): The firmware BSP version control for each slots. + firmware_update_request (FirmwareUpdateRequest) + firmware_manifest (FirmwareBSPVersionControl) + """ + self.current_slot_bsp_ver = fw_bsp_ver_control.current_slot_bsp_ver + + self.nvbootctrl_conf = nvbootctrl_conf + self.firmware_update_request = firmware_update_request + self.firmware_manifest = firmware_manifest + self.firmware_package_bsp_ver = BSPVersion.parse( + firmware_manifest.firmware_spec.bsp_version + ) + + self.esp_part = _detect_esp_dev(boot_parent_devpath) + # NOTE: use the esp partition at the current booted device + # i.e., if we boot from nvme0n1, then bootdev_path is /dev/nvme0n1 and + # we use the esp at nvme0n1. + self.esp_mp = Path(boot_cfg.ESP_MOUNTPOINT) + self.esp_mp.mkdir(exist_ok=True) + + esp_boot_dir = self.esp_mp / "EFI" / "BOOT" + self.l4tlauncher_ver_fpath = esp_boot_dir / boot_cfg.L4TLAUNCHER_VER_FNAME + """A plain text file stores the BSP version string.""" + self.bootaa64_at_esp = esp_boot_dir / boot_cfg.L4TLAUNCHER_FNAME + """The canonical location of L4TLauncher, called by UEFI.""" + self.bootaa64_at_esp_bak = esp_boot_dir / f"{boot_cfg.L4TLAUNCHER_FNAME}_bak" + """The location to backup current L4TLauncher binary.""" + self.capsule_dir_at_esp = self.esp_mp / boot_cfg.CAPSULE_PAYLOAD_AT_ESP + """The location to put UEFI capsule to. The UEFI will use the capsule in this location.""" + + self.standby_slot_mp = Path(standby_slot_mp) + + def _prepare_fwupdate_capsule(self) -> bool: + """Copy the Capsule update payloads to specific location at esp partition. + + The UEFI framework will look for the firmware update capsule there and start + the firmware update. + + Returns: + True if at least one of the update capsule is prepared, False if no update + capsule is available and configured. + """ + capsule_dir_at_esp = self.capsule_dir_at_esp + capsule_dir_at_esp.mkdir(parents=True, exist_ok=True) + + # ------ prepare capsule update payload ------ # + firmware_package_configured = False + for capsule_payload in self.firmware_manifest.get_firmware_packages( + self.firmware_update_request + ): + if capsule_payload.type != PayloadType.UEFI_CAPSULE: + continue + + # NOTE: currently we only support payload indicated by file path. + capsule_flocation = capsule_payload.file_location + capsule_fpath = capsule_flocation.location_path + assert capsule_flocation.location_type == "file" and isinstance( + capsule_fpath, str + ) + + capsule_fpath = Path(replace_root(capsule_fpath, "/", self.standby_slot_mp)) + capsule_digest_alg, capsule_digest_value = ( + capsule_payload.digest.algorithm, + capsule_payload.digest.digest, + ) + + try: + _digest = file_digest(capsule_fpath, algorithm=capsule_digest_alg) + assert ( + _digest == capsule_digest_value + ), f"{capsule_digest_alg} validation failed, expect {capsule_digest_value}, get {_digest}" + + shutil.copy( + src=capsule_fpath, + dst=capsule_dir_at_esp / capsule_payload.payload_name, + ) + firmware_package_configured = True + logger.info( + f"copy {capsule_payload.payload_name} to {capsule_dir_at_esp}" + ) + except Exception as e: + logger.warning( + f"failed to copy {capsule_payload.payload_name} to {capsule_dir_at_esp}: {e!r}" + ) + logger.warning(f"skip preparing {capsule_payload.payload_name}") + return firmware_package_configured + + def _update_l4tlauncher(self) -> bool: + """update L4TLauncher with OTA image's one.""" + for _payload in self.firmware_manifest.get_firmware_packages( + self.firmware_update_request + ): + if _payload.type != PayloadType.UEFI_BOOT_APP: + continue + + logger.warning( + f"update the l4tlauncher to version {self.firmware_package_bsp_ver} ..." + ) + + # NOTE: currently we only support payload indicated by file path. + bootapp_flocation = _payload.file_location + bootapp_fpath, bootapp_ftype = ( + bootapp_flocation.location_path, + bootapp_flocation.location_type, + ) + assert bootapp_ftype == "file" and isinstance(bootapp_fpath, str) + + # new BOOTAA64.efi is located at OTA image + ota_image_bootaa64 = replace_root(bootapp_fpath, "/", self.standby_slot_mp) + payload_digest_alg, payload_digest_value = ( + _payload.digest.algorithm, + _payload.digest.digest, + ) + + new_l4tlauncher_ver_ctrl = L4TLauncherBSPVersionControl( + bsp_ver=self.firmware_package_bsp_ver, + sha256_digest=_payload.digest.digest, + ) + try: + _digest = file_digest(ota_image_bootaa64, algorithm=payload_digest_alg) + assert ( + _digest == payload_digest_value + ), f"{payload_digest_alg} validation failed, expect {payload_digest_value}, get {_digest}" + + shutil.copy(self.bootaa64_at_esp, self.bootaa64_at_esp_bak) + shutil.copy(ota_image_bootaa64, self.bootaa64_at_esp) + os.sync() # ensure the boot application is written to the disk + + write_str_to_file_sync( + self.l4tlauncher_ver_fpath, + new_l4tlauncher_ver_ctrl.dump(), + ) + return True + except Exception as e: + _err_msg = f"failed to prepare boot application update: {e!r}, skip" + logger.warning(_err_msg) + + logger.info("no boot application update is configured in the request, skip") + return False + + # APIs + + def firmware_update(self) -> bool: + """Trigger firmware update in next boot if configured. + + Only when the following conditions met the firmware update will be configured: + 1. a firmware_update_request file is presented and valid in the OTA image. + 2. the firmware_manifest is presented and valid in the OTA image. + 3. the firmware package is compatible with the device. + 4. firmware specified in firmware_update_request is valid. + 5. the firmware package's BSP version is the same or newer than current slot's firmware BSP version. + + NOTE: firmware CANNOT be downgraded with UEFI Capsule update mechanism. + NOTE(20240826): the firmware update is executed by the CURRENT slot's UEFI framework, so + we need to check the firmware package's version against CURRENT slot's firmware BSP version. + + Returns: + True if firmware update is configured, False if there is no firmware update. + """ + # sanity check, UEFI Capsule firmware update is only supported after R35.2. + if self.current_slot_bsp_ver < FIRMWARE_UPDATE_MINIMUM_SUPPORTED_BSP_VERSION: + _err_msg = ( + f"UEFI Capsule firmware update is introduced since {FIRMWARE_UPDATE_MINIMUM_SUPPORTED_BSP_VERSION}, " + f"but get {self.current_slot_bsp_ver=}, abort" + ) + logger.error(_err_msg) + return False + + if ( + self.firmware_package_bsp_ver + < FIRMWARE_UPDATE_MINIMUM_SUPPORTED_BSP_VERSION + ): + _err_msg = ( + f"update firmware to BSP version < {FIRMWARE_UPDATE_MINIMUM_SUPPORTED_BSP_VERSION} " + "is not supported, abort" + ) + logger.warning(_err_msg) + return False + + # check BSP version, NVIDIA Jetson device with R34 or newer doesn't allow firmware downgrade. + if self.current_slot_bsp_ver > self.firmware_package_bsp_ver: + logger.info( + ( + "current slot's firmware BSP version is newer than firmware package's," + " skip firmware update: " + f"{self.current_slot_bsp_ver=}, {self.firmware_package_bsp_ver=}" + ) + ) + return False + + # check firmware compatibility, this is to prevent failed firmware update beforehand. + tnspec = get_nvbootctrl_conf_tnspec(self.nvbootctrl_conf) + if not tnspec: + logger.warning( + "TNSPEC is not defined in nvbootctrl config file, skip firmware update!" + ) + return False + + if not self.firmware_manifest.check_compat(tnspec): + _err_msg = ( + "firmware package is incompatible with this device: " + f"{tnspec=}, {self.firmware_manifest.firmware_spec.firmware_compat}, " + "skip firmware update" + ) + logger.warning(_err_msg) + return False + + # copy the firmware update capsule to specific location + with _ensure_esp_mounted(self.esp_part, self.esp_mp): + if not self._prepare_fwupdate_capsule(): + logger.info("no firmware file is prepared, skip firmware update") + return False + + logger.info("on capsule prepared, try to update L4TLauncher ...") + l4tlauncher_bsp_ver = _l4tlauncher_version_control( + l4tlauncher_ver_fpath=self.l4tlauncher_ver_fpath, + l4tlauncher_at_esp=self.bootaa64_at_esp, + current_slot_bsp_ver=self.current_slot_bsp_ver, + ) + logger.info(f"current l4tlauncher version: {l4tlauncher_bsp_ver}") + + if l4tlauncher_bsp_ver >= self.firmware_package_bsp_ver: + logger.info( + ( + "installed l4tlauncher has newer or equal version of l4tlauncher to OTA image's one, " + f"{l4tlauncher_bsp_ver=}, {self.firmware_package_bsp_ver=}, " + "skip l4tlauncher update" + ) + ) + else: + self._update_l4tlauncher() + + # write special UEFI variable to trigger firmware update on next reboot + device_uses_qspi = _detect_ota_bootdev_is_qspi(self.nvbootctrl_conf) + if device_uses_qspi is None: + logger.warning("failed to detect OTA_BOOTDEV, skip firmware update") + return False + + if device_uses_qspi: + firmware_update_triggerred = _trigger_capsule_update_qspi_ota_bootdev() + else: + firmware_update_triggerred = _trigger_capsule_update_non_qspi_ota_bootdev( + self.esp_mp + ) + + if firmware_update_triggerred: + logger.warning( + "firmware update package prepare finished. " + f"will update firmware to {self.firmware_package_bsp_ver} in next reboot" + ) + return firmware_update_triggerred + + +class _UEFIBootControl: + """Low-level boot control implementation for jetson-uefi.""" + + def __init__(self): + # ------ sanity check, confirm we are at jetson device ------ # + tegra_compat_info_fpath = Path(boot_cfg.TEGRA_COMPAT_PATH) + if not tegra_compat_info_fpath.is_file(): + _err_msg = f"not a jetson device, {tegra_compat_info_fpath} doesn't exist" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) + + # print hardware model + if (model_fpath := Path(boot_cfg.MODEL_FPATH)).is_file(): + logger.info(f"hardware model: {model_fpath.read_text()}") + + compat_info = tegra_compat_info_fpath.read_text() + # example compatible string: + # nvidia,p3737-0000+p3701-0000nvidia,tegra234nvidia,tegra23x + if compat_info.find("tegra") == -1: + _err_msg = f"uncompatible device: {compat_info=}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) + logger.info(f"dev compatibility: {compat_info}") + + # ------ load nvbootctrl config file ------ # + if not ( + nvbootctrl_conf_fpath := Path(boot_cfg.NVBOOTCTRL_CONF_FPATH) + ).is_file(): + _err_msg = "nv_boot_ctrl.conf is missing!" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) + self.nvbootctrl_conf = nvbootctrl_conf_fpath.read_text() + logger.info(f"nvboot_ctrl_conf: \n{self.nvbootctrl_conf}") + + # ------ check current slot BSP version ------ # + # check current slot firmware BSP version + try: + self.fw_bsp_version = fw_bsp_version = ( + NVBootctrlJetsonUEFI.get_current_fw_bsp_version() + ) + assert fw_bsp_version, "bsp version information not available" + logger.info(f"current slot firmware BSP version: {fw_bsp_version}") + except Exception as e: + _err_msg = f"failed to detect BSP version: {e!r}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) from None + + # check current slot rootfs BSP version + try: + self.rootfs_bsp_verion = rootfs_bsp_version = detect_rootfs_bsp_version( + rootfs=cfg.ACTIVE_ROOTFS_PATH + ) + logger.info(f"current slot rootfs BSP version: {rootfs_bsp_version}") + except Exception as e: + logger.warning(f"failed to detect rootfs BSP version: {e!r}") + self.rootfs_bsp_verion = rootfs_bsp_version = None + + if rootfs_bsp_version and rootfs_bsp_version > fw_bsp_version: + logger.warning( + ( + "current slot's rootfs bsp version is newer than the firmware bsp version, " + "this indicates the device previously only receive rootfs update, this is unexpected" + ) + ) + + if rootfs_bsp_version != fw_bsp_version: + logger.warning( + ( + f"current slot has rootfs BSP version {rootfs_bsp_version}, " + f"while the firmware version is {fw_bsp_version}, " + "rootfs BSP version and firmware version are MISMATCHED! " + "This might result in nvbootctrl not working as expected!" + ) + ) + + # ------ sanity check, jetson-uefi only supports >= R34 ----- # + if fw_bsp_version < MINIMUM_SUPPORTED_BSP_VERSION: + _err_msg = f"jetson-uefi only supports BSP version >= {MINIMUM_SUPPORTED_BSP_VERSION}, but get {fw_bsp_version=}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) + + # unified A/B is enabled by default and cannot be disabled after BSP R34. + logger.info("unified A/B is enabled") + + # ------ check A/B slots ------ # + try: + self.current_slot = current_slot = NVBootctrlJetsonUEFI.get_current_slot() + self.standby_slot = standby_slot = NVBootctrlJetsonUEFI.get_standby_slot() + self.active_bootloader_slot = ( + NVBootctrlJetsonUEFI.get_active_bootloader_slot() + ) + except NVBootctrlExecError as e: + _err_msg = f"failed to get slot info: {e!r}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) from e + logger.info(f"{current_slot=}, {standby_slot=}") + + # ------ detect rootfs_dev and parent_dev ------ # + try: + self.curent_rootfs_devpath = current_rootfs_devpath = ( + CMDHelperFuncs.get_current_rootfs_dev() + ) + self.parent_devpath = parent_devpath = Path( + CMDHelperFuncs.get_parent_dev(current_rootfs_devpath) + ) + except Exception as e: + _err_msg = f"failed to detect rootfs dev: {e!r}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) from e + + # --- detect boot device --- # + self.external_rootfs = detect_external_rootdev(parent_devpath) + + self.standby_rootfs_devpath = get_partition_devpath( + parent_devpath=parent_devpath, + partition_id=SLOT_PAR_MAP[standby_slot], + ) + + try: + self.standby_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( + "PARTUUID", self.standby_rootfs_devpath + ).strip() + current_rootfs_dev_partuuid = CMDHelperFuncs.get_attrs_by_dev( + "PARTUUID", current_rootfs_devpath + ).strip() + except Exception as e: + _err_msg = f"failed to detect rootfs PARTUUID: {e!r}" + logger.error(_err_msg) + raise JetsonUEFIBootControlError(_err_msg) from e + + logger.info( + "rootfs device: \n" + f"active_rootfs(slot {current_slot}): {self.curent_rootfs_devpath=}, {current_rootfs_dev_partuuid=}\n" + f"standby_rootfs(slot {standby_slot}): {self.standby_rootfs_devpath=}, {self.standby_rootfs_dev_partuuid=}" + ) + + self.standby_internal_emmc_devpath = get_partition_devpath( + parent_devpath=f"/dev/{boot_cfg.INTERNAL_EMMC_DEVNAME}", + partition_id=SLOT_PAR_MAP[standby_slot], + ) + + logger.info("finished jetson-uefi boot control startup") + logger.info( + f"nvbootctrl dump-slots-info: \n{NVBootctrlJetsonUEFI.dump_slots_info()}" + ) + + # API + + def switch_boot_to_standby(self) -> None: # pragma: no cover + target_slot = self.standby_slot + + logger.info(f"switch boot to standby slot({target_slot})") + # when unified_ab enabled, switching bootloader slot will also switch + # the rootfs slot. + NVBootctrlJetsonUEFI.set_active_boot_slot(target_slot) + + +class JetsonUEFIBootControl(BootControllerProtocol): + """BootControllerProtocol implementation for jetson-uefi.""" + + def __init__(self) -> None: + try: + self._uefi_control = uefi_control = _UEFIBootControl() + + # mount point prepare + self._mp_control = SlotMountHelper( + standby_slot_dev=uefi_control.standby_rootfs_devpath, + standby_slot_mount_point=cfg.MOUNT_POINT, + active_slot_dev=self._uefi_control.curent_rootfs_devpath, + active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT, + ) + + # init ota-status files + current_ota_status_dir = Path(boot_cfg.OTA_STATUS_DIR) + standby_ota_status_dir = Path( + replace_root( + boot_cfg.OTA_STATUS_DIR, + "/", + cfg.MOUNT_POINT, + ) + ) + self._ota_status_control = OTAStatusFilesControl( + active_slot=str(self._uefi_control.current_slot), + standby_slot=str(self._uefi_control.standby_slot), + current_ota_status_dir=current_ota_status_dir, + # NOTE: might not yet be populated before OTA update applied! + standby_ota_status_dir=standby_ota_status_dir, + finalize_switching_boot=self._finalize_switching_boot, + ) + + # load firmware BSP version + current_fw_bsp_ver_fpath = ( + current_ota_status_dir / boot_cfg.FIRMWARE_BSP_VERSION_FNAME + ) + self._firmware_bsp_ver_control = bsp_ver_ctrl = FirmwareBSPVersionControl( + current_slot=uefi_control.current_slot, + current_slot_bsp_ver=uefi_control.fw_bsp_version, + current_bsp_version_file=current_fw_bsp_ver_fpath, + ) + # always update the bsp_version_file on startup to reflect + # the up-to-date current slot BSP version + self._firmware_bsp_ver_control.write_to_file(current_fw_bsp_ver_fpath) + logger.info( + f"\ncurrent slot firmware BSP version: {uefi_control.fw_bsp_version}\n" + f"standby slot firmware BSP version: {bsp_ver_ctrl.standby_slot_bsp_ver}" + ) + + logger.info("jetson-uefi boot control start up finished") + except Exception as e: + _err_msg = f"failed to start jetson-uefi controller: {e!r}" + raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e + + def _finalize_switching_boot(self) -> bool: + """Verify firmware update result and write firmware BSP version file. + + NOTE that if capsule firmware update failed, we must be booted back to the + previous slot, so actually we don't need to do any checks here. + """ + fw_update_verify = NVBootctrlJetsonUEFI.verify() + if fw_update_verify: + logger.info(f"nvbootctrl verify: {fw_update_verify}") + return True + + def _firmware_update(self) -> bool: + """Perform firmware update with UEFI Capsule update if needed. + + Returns: + True if there is firmware update configured, False for no firmware update. + """ + logger.info("jetson-uefi: checking if we need to do firmware update ...") + firmware_package_meta = load_firmware_package( + firmware_update_request_fpath=replace_root( + boot_cfg.FIRMWARE_UPDATE_REQUEST_FPATH, + "/", + self._mp_control.standby_slot_mount_point, + ), + firmware_manifest_fpath=replace_root( + boot_cfg.FIRMWARE_MANIFEST_FPATH, + "/", + self._mp_control.standby_slot_mount_point, + ), + ) + if firmware_package_meta is None: + logger.info("skip firmware update ...") + return False + firmware_update_request, firmware_manifest = firmware_package_meta + + fw_update_bsp_ver = BSPVersion.parse( + firmware_manifest.firmware_spec.bsp_version + ) + logger.info(f"firmware update package BSP version: {fw_update_bsp_ver}") + + # ------ prepare firmware update ------ # + firmware_updater = UEFIFirmwareUpdater( + boot_parent_devpath=self._uefi_control.parent_devpath, + standby_slot_mp=self._mp_control.standby_slot_mount_point, + fw_bsp_ver_control=self._firmware_bsp_ver_control, + firmware_update_request=firmware_update_request, + firmware_manifest=firmware_manifest, + nvbootctrl_conf=self._uefi_control.nvbootctrl_conf, + ) + return firmware_updater.firmware_update() + + # APIs + + def get_standby_slot_path(self) -> Path: # pragma: no cover + return self._mp_control.standby_slot_mount_point + + def get_standby_boot_dir(self) -> Path: # pragma: no cover + return self._mp_control.standby_boot_dir + + def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool): + try: + logger.info("jetson-uefi: pre-update ...") + self._ota_status_control.pre_update_current() + + self._mp_control.prepare_standby_dev(erase_standby=erase_standby) + self._mp_control.mount_standby() + self._mp_control.mount_active() + + self._ota_status_control.pre_update_standby(version=version) + except Exception as e: + _err_msg = f"failed on pre_update: {e!r}" + logger.error(_err_msg) + raise ota_errors.BootControlPreUpdateFailed( + _err_msg, module=__name__ + ) from e + + def post_update(self) -> Generator[None, None, None]: + try: + logger.info("jetson-uefi: post-update ...") + # ------ update extlinux.conf ------ # + update_standby_slot_extlinux_cfg( + active_slot_extlinux_fpath=Path(boot_cfg.EXTLINUX_FILE), + standby_slot_extlinux_fpath=Path( + replace_root( + boot_cfg.EXTLINUX_FILE, + "/", + self._mp_control.standby_slot_mount_point, + ) + ), + standby_slot_partuuid=self._uefi_control.standby_rootfs_dev_partuuid, + ) + + # ------ preserve BSP version file to standby slot ------ # + standby_fw_bsp_ver_fpath = ( + self._ota_status_control.standby_ota_status_dir + / boot_cfg.FIRMWARE_BSP_VERSION_FNAME + ) + self._firmware_bsp_ver_control.write_to_file(standby_fw_bsp_ver_fpath) + + # ------ preserve /boot/ota folder to standby rootfs ------ # + preserve_ota_config_files_to_standby( + active_slot_ota_dirpath=self._mp_control.active_slot_mount_point + / "boot" + / "ota", + standby_slot_ota_dirpath=self._mp_control.standby_slot_mount_point + / "boot" + / "ota", + ) + + # ------ firmware update & switch boot to standby ------ # + if ( + self._uefi_control.active_bootloader_slot + and self._uefi_control.current_slot + != self._uefi_control.active_bootloader_slot + ): + _err_msg = ( + "warning, active bootloader slot is mismatched with current slot: " + f"current_slot={self._uefi_control.current_slot}, " + f"active_bootloader_slot={self._uefi_control.active_bootloader_slot}. " + "this migth indicate an previous interrupted OTA, and this mismatch " + "will cancel the firmware update if configured, " + "correct this mismatch ..." + ) + logger.warning(_err_msg) + NVBootctrlJetsonUEFI.set_active_boot_slot( + self._uefi_control.current_slot + ) + self._uefi_control.active_bootloader_slot = ( + NVBootctrlJetsonUEFI.get_active_bootloader_slot() + ) + + firmware_update_triggered = self._firmware_update() + # NOTE: manual switch boot will cancel the scheduled firmware update! + if not firmware_update_triggered: + self._uefi_control.switch_boot_to_standby() + logger.info( + f"no firmware update configured, manually switch slot: \n{NVBootctrlJetsonUEFI.dump_slots_info()}" + ) + + # ------ for external rootfs, preserve /boot folder to internal ------ # + # NOTE: the copy should happen AFTER the changes to /boot folder at active slot. + if self._uefi_control.external_rootfs: + logger.info( + "rootfs on external storage enabled: " + "copy standby slot rootfs' /boot folder " + "to corresponding internal emmc dev ..." + ) + copy_standby_slot_boot_to_internal_emmc( + internal_emmc_mp=Path(boot_cfg.SEPARATE_BOOT_MOUNT_POINT), + internal_emmc_devpath=self._uefi_control.standby_internal_emmc_devpath, + standby_slot_boot_dirpath=self._mp_control.standby_slot_mount_point + / "boot", + ) + + # ------ prepare to reboot ------ # + self._mp_control.umount_all(ignore_error=True) + logger.info("post update finished, wait for reboot ...") + yield # hand over control back to otaclient + CMDHelperFuncs.reboot() + except Exception as e: + _err_msg = f"jetson-uefi: failed on post_update: {e!r}" + logger.error(_err_msg) + raise ota_errors.BootControlPostUpdateFailed( + _err_msg, module=__name__ + ) from e + + def pre_rollback(self): + try: + logger.info("jetson-uefi: pre-rollback setup ...") + self._ota_status_control.pre_rollback_current() + self._mp_control.mount_standby() + self._ota_status_control.pre_rollback_standby() + except Exception as e: + _err_msg = f"jetson-uefi: failed on pre_rollback: {e!r}" + logger.error(_err_msg) + raise ota_errors.BootControlPreRollbackFailed( + _err_msg, module=__name__ + ) from e + + def post_rollback(self): + try: + logger.info("jetson-uefi: post-rollback setup...") + self._mp_control.umount_all(ignore_error=True) + self._uefi_control.switch_boot_to_standby() + CMDHelperFuncs.reboot() + except Exception as e: + _err_msg = f"jetson-uefi: failed on post_rollback: {e!r}" + logger.error(_err_msg) + raise ota_errors.BootControlPostRollbackFailed( + _err_msg, module=__name__ + ) from e + + def on_operation_failure(self): + """Failure registering and cleanup at failure.""" + logger.warning("on failure try to unmounting standby slot...") + self._ota_status_control.on_failure() + self._mp_control.umount_all(ignore_error=True) + + def load_version(self) -> str: # pragma: no cover + return self._ota_status_control.load_active_slot_version() + + def get_booted_ota_status(self) -> api_types.StatusOta: # pragma: no cover + return self._ota_status_control.booted_ota_status diff --git a/src/otaclient/boot_control/configs.py b/src/otaclient/boot_control/configs.py index 915db9b37..852f2854a 100644 --- a/src/otaclient/boot_control/configs.py +++ b/src/otaclient/boot_control/configs.py @@ -35,20 +35,28 @@ class GrubControlConfig(BaseConfig): class JetsonBootCommon: - TEGRA_CHIP_ID_PATH = "/sys/module/tegra_fuse/parameters/tegra_chip_id" + # ota_status related OTA_STATUS_DIR = "/boot/ota-status" FIRMWARE_BSP_VERSION_FNAME = "firmware_bsp_version" - EXTLINUX_FILE = "/boot/extlinux/extlinux.conf" - FIRMWARE_DPATH = "/opt/ota_package" - """Refer to standby slot rootfs.""" + # boot control related + EXTLINUX_FILE = "/boot/extlinux/extlinux.conf" + MODEL_FPATH = "/proc/device-tree/model" NV_TEGRA_RELEASE_FPATH = "/etc/nv_tegra_release" SEPARATE_BOOT_MOUNT_POINT = "/mnt/standby_boot" + # boot device related MMCBLK_DEV_PREFIX = "mmcblk" # internal emmc NVMESSD_DEV_PREFIX = "nvme" # external nvme ssd + SDX_DEV_PREFIX = "sd" # non-specific device name INTERNAL_EMMC_DEVNAME = "mmcblk0" + # firmware update related + NVBOOTCTRL_CONF_FPATH = "/etc/nv_boot_control.conf" + FIRMWARE_DPATH = "/opt/ota/firmware" + FIRMWARE_UPDATE_REQUEST_FPATH = f"{FIRMWARE_DPATH}/firmware_update.yaml" + FIRMWARE_MANIFEST_FPATH = f"{FIRMWARE_DPATH}/firmware_manifest.yaml" + class JetsonCBootControlConfig(JetsonBootCommon): """Jetson device booted with cboot. @@ -57,9 +65,23 @@ class JetsonCBootControlConfig(JetsonBootCommon): """ BOOTLOADER = BootloaderType.JETSON_CBOOT + # this path only exists on xavier + TEGRA_CHIP_ID_PATH = "/sys/module/tegra_fuse/parameters/tegra_chip_id" FIRMWARE_LIST = ["bl_only_payload", "xusb_only_payload"] +class JetsonUEFIBootControlConfig(JetsonBootCommon): + BOOTLOADER = BootloaderType.JETSON_UEFI + TEGRA_COMPAT_PATH = "/sys/firmware/devicetree/base/compatible" + L4TLAUNCHER_FNAME = "BOOTAA64.efi" + ESP_MOUNTPOINT = "/mnt/esp" + ESP_PARTLABEL = "esp" + UPDATE_TRIGGER_EFIVAR = "OsIndications-8be4df61-93ca-11d2-aa0d-00e098032b8c" + MAGIC_BYTES = b"\x07\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00" + CAPSULE_PAYLOAD_AT_ESP = "EFI/UpdateCapsule" + L4TLAUNCHER_VER_FNAME = "l4tlauncher_version" + + @dataclass class RPIBootControlConfig(BaseConfig): BBOOTLOADER: BootloaderType = BootloaderType.RPI_BOOT @@ -73,5 +95,9 @@ class RPIBootControlConfig(BaseConfig): grub_cfg = GrubControlConfig() + +jetson_common_cfg = JetsonBootCommon() cboot_cfg = JetsonCBootControlConfig() +jetson_uefi_cfg = JetsonUEFIBootControlConfig() + rpi_boot_cfg = RPIBootControlConfig() diff --git a/src/otaclient/boot_control/selecter.py b/src/otaclient/boot_control/selecter.py index 28f272759..6f853c72f 100644 --- a/src/otaclient/boot_control/selecter.py +++ b/src/otaclient/boot_control/selecter.py @@ -90,6 +90,12 @@ def get_boot_controller( from ._jetson_cboot import JetsonCBootControl return JetsonCBootControl + + if bootloader_type == BootloaderType.JETSON_UEFI: + from ._jetson_uefi import JetsonUEFIBootControl + + return JetsonUEFIBootControl + if bootloader_type == BootloaderType.RPI_BOOT: from ._rpi_boot import RPIBootController diff --git a/src/otaclient/configs/ecu_info.py b/src/otaclient/configs/ecu_info.py index dffb3ca8a..08b850bb7 100644 --- a/src/otaclient/configs/ecu_info.py +++ b/src/otaclient/configs/ecu_info.py @@ -46,6 +46,7 @@ class BootloaderType(str, Enum): GRUB = "grub" CBOOT = "cboot" # deprecated, use jetson-cboot instead JETSON_CBOOT = "jetson-cboot" + JETSON_UEFI = "jetson-uefi" RPI_BOOT = "rpi_boot" @staticmethod diff --git a/src/otaclient_common/__init__.py b/src/otaclient_common/__init__.py index 593eee24f..56ffe96c6 100644 --- a/src/otaclient_common/__init__.py +++ b/src/otaclient_common/__init__.py @@ -44,7 +44,9 @@ def get_file_size( return ceil(swapfile_fpath.stat().st_size / _multiplier[units]) -def replace_root(path: str | Path, old_root: str | Path, new_root: str | Path) -> str: +def replace_root( + path: str | Path, old_root: str | Path | Literal["/"], new_root: str | Path +) -> str: """Replace a relative to to . For example, if path="/abc", old_root="/", new_root="/new_root", diff --git a/src/otaclient_common/common.py b/src/otaclient_common/common.py index f8136236d..f81b362d4 100644 --- a/src/otaclient_common/common.py +++ b/src/otaclient_common/common.py @@ -20,12 +20,13 @@ from __future__ import annotations +import hashlib import logging import os import shutil import subprocess import time -from hashlib import sha256 +from functools import partial from pathlib import Path from typing import Optional, Union from urllib.parse import urljoin @@ -33,6 +34,7 @@ import requests from otaclient_common.linux import subprocess_run_wrapper +from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) @@ -52,17 +54,17 @@ def wait_with_backoff(_retry_cnt: int, *, _backoff_factor: float, _backoff_max: # file verification -def file_sha256( - filename: Union[Path, str], *, chunk_size: int = 1 * 1024 * 1024 -) -> str: - with open(filename, "rb") as f: - m = sha256() - while True: - d = f.read(chunk_size) - if len(d) == 0: - break - m.update(d) - return m.hexdigest() +def file_digest(fpath: StrOrPath, *, algorithm: str, chunk_size: int = 1 * 1024 * 1024): + """Generate file digest with .""" + with open(fpath, "rb") as f: + hasher = hashlib.new(algorithm) + while d := f.read(chunk_size): + hasher.update(d) + return hasher.hexdigest() + + +file_sha256 = partial(file_digest, algorithm="sha256") +file_sha256.__doc__ = "Generate file digest with sha256." def verify_file(fpath: Path, fhash: str, fsize: Optional[int]) -> bool: diff --git a/tests/test_otaclient/test_boot_control/test_firmware_package.py b/tests/test_otaclient/test_boot_control/test_firmware_package.py index a5cf8a5ff..0a5884e32 100644 --- a/tests/test_otaclient/test_boot_control/test_firmware_package.py +++ b/tests/test_otaclient/test_boot_control/test_firmware_package.py @@ -25,7 +25,7 @@ FirmwareUpdateRequest, HardwareType, NVIDIAFirmwareCompat, - NVIDIAUEFIFirmwareSpec, + NVIDIAFirmwareSpec, PayloadFileLocation, PayloadType, ) @@ -50,8 +50,8 @@ ), ), ) -def test_digest_value_parsing(_in, _expected): - _parsed = DigestValue(_in) +def test_digest_value_parsing(_in, _expected: list[str]): + _parsed = DigestValue.parse(_in) assert _parsed.algorithm == _expected[0] assert _parsed.digest == _expected[1] @@ -67,13 +67,13 @@ def test_digest_value_parsing(_in, _expected): _in := "sha256:32baa6f7e96661d50fb78e5d7149763e3a0fe70c51c37c6bea92c3c27cd2472d", [ "blob", - DigestValue(_in), + DigestValue.parse(_in), ], ), ), ) -def test_payload_file_location(_in, _expected): - _parsed = PayloadFileLocation(_in) +def test_payload_file_location(_in, _expected: list[str] | list[str | DigestValue]): + _parsed = PayloadFileLocation.parse(_in) assert _parsed.location_type == _expected[0] assert _parsed.location_path == _expected[1] @@ -133,6 +133,10 @@ def test_check_compat(_spec, _compat_str, _expected): file_location: file:///opt/ota/firmware/BOOTAA64.efi type: UEFI-BOOT-APP digest: "sha256:ac17457772666351154a5952e3b87851a6398da2afcf3a38bedfc0925760bb0e" + - payload_name: some_payload_in_blob_storage + file_location: "sha256:55f91b2fc9c397cc83ca7c23e627d09b542ae02381db3ca480e0242fca14e935" + type: UEFI-CAPSULE + digest: "sha256:55f91b2fc9c397cc83ca7c23e627d09b542ae02381db3ca480e0242fca14e935" """ EXAMPLE_FIRMWARE_MANIFEST_PARSED = FirmwareManifest( @@ -140,7 +144,7 @@ def test_check_compat(_spec, _compat_str, _expected): hardware_type=HardwareType("nvidia_jetson"), hardware_series="ADLINK RQX", hardware_model="rqx580", - firmware_spec=NVIDIAUEFIFirmwareSpec( + firmware_spec=NVIDIAFirmwareSpec( bsp_version="r35.4.1", firmware_compat=NVIDIAFirmwareCompat( board_id="2888", @@ -153,22 +157,34 @@ def test_check_compat(_spec, _compat_str, _expected): firmware_packages=[ FirmwarePackage( payload_name="bl_only_payload.Cap", - file_location=PayloadFileLocation( + file_location=PayloadFileLocation.parse( "file:///opt/ota/firmware/bl_only_payload.Cap" ), type=PayloadType("UEFI-CAPSULE"), - digest=DigestValue( + digest=DigestValue.parse( "sha256:32baa6f7e96661d50fb78e5d7149763e3a0fe70c51c37c6bea92c3c27cd2472d" ), ), FirmwarePackage( payload_name="BOOTAA64.efi", - file_location=PayloadFileLocation("file:///opt/ota/firmware/BOOTAA64.efi"), + file_location=PayloadFileLocation.parse( + "file:///opt/ota/firmware/BOOTAA64.efi" + ), type=PayloadType("UEFI-BOOT-APP"), - digest=DigestValue( + digest=DigestValue.parse( "sha256:ac17457772666351154a5952e3b87851a6398da2afcf3a38bedfc0925760bb0e" ), ), + FirmwarePackage( + payload_name="some_payload_in_blob_storage", + file_location=PayloadFileLocation.parse( + "sha256:55f91b2fc9c397cc83ca7c23e627d09b542ae02381db3ca480e0242fca14e935" + ), + type=PayloadType("UEFI-CAPSULE"), + digest=DigestValue.parse( + "sha256:55f91b2fc9c397cc83ca7c23e627d09b542ae02381db3ca480e0242fca14e935" + ), + ), ], ) diff --git a/tests/test_otaclient/test_boot_control/test_jetson_cboot.py b/tests/test_otaclient/test_boot_control/test_jetson_cboot.py deleted file mode 100644 index 740a68b00..000000000 --- a/tests/test_otaclient/test_boot_control/test_jetson_cboot.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -NOTE: this test file only test utils used in jetson-cboot. -""" - - -from __future__ import annotations - -import logging -from pathlib import Path -from typing import Any - -import pytest - -from otaclient.boot_control import _jetson_cboot -from otaclient.boot_control._jetson_common import ( - BSPVersion, - FirmwareBSPVersion, - SlotID, - parse_bsp_version, - update_extlinux_cfg, -) -from tests.conftest import TEST_DIR - -logger = logging.getLogger(__name__) - -MODULE_NAME = _jetson_cboot.__name__ -TEST_DATA_DIR = TEST_DIR / "data" - - -def test_SlotID(): - SlotID("0") - SlotID("1") - with pytest.raises(ValueError): - SlotID("abc") - - -class TestBSPVersion: - - @pytest.mark.parametrize( - ["_in", "expected"], - ( - ("R32.5.2", BSPVersion(32, 5, 2)), - ("R32.6.1", BSPVersion(32, 6, 1)), - ("R35.4.1", BSPVersion(35, 4, 1)), - ), - ) - def test_parse(self, _in: str, expected: BSPVersion): - assert BSPVersion.parse(_in) == expected - - @pytest.mark.parametrize( - ["_in", "expected"], - ( - (BSPVersion(32, 5, 2), "R32.5.2"), - (BSPVersion(32, 6, 1), "R32.6.1"), - (BSPVersion(35, 4, 1), "R35.4.1"), - ), - ) - def test_dump(self, _in: BSPVersion, expected: str): - assert BSPVersion.dump(_in) == expected - - -@pytest.mark.parametrize( - ["_in", "expected"], - ( - ( - { - "slot_a": None, - "slot_b": None, - }, - FirmwareBSPVersion(), - ), - ( - { - "slot_a": None, - "slot_b": "R32.6.1", - }, - FirmwareBSPVersion(slot_a=None, slot_b=BSPVersion(32, 6, 1)), - ), - ( - { - "slot_a": "R32.5.2", - "slot_b": "R32.6.1", - }, - FirmwareBSPVersion( - slot_a=BSPVersion(32, 5, 2), slot_b=BSPVersion(32, 6, 1) - ), - ), - ), -) -def test_FirmwareBSPVersion(_in: dict[str, Any], expected: FirmwareBSPVersion): - assert FirmwareBSPVersion.model_validate(_in) == expected - - -@pytest.mark.parametrize( - ["_in", "expected"], - ( - ( - ( - "# R32 (release), REVISION: 6.1, GCID: 27863751, BOARD: t186ref, EABI: aarch64, DATE: Mon Jul 26 19:36:31 UTC 2021", - BSPVersion(32, 6, 1), - ), - ( - "# R35 (release), REVISION: 4.1, GCID: 33958178, BOARD: t186ref, EABI: aarch64, DATE: Tue Aug 1 19:57:35 UTC 2023", - BSPVersion(35, 4, 1), - ), - ) - ), -) -def test_parse_bsp_version(_in: str, expected: BSPVersion): - assert parse_bsp_version(_in) == expected - - -@pytest.mark.parametrize( - ["_template_f", "_updated_f", "partuuid"], - ( - ( - "extlinux.conf-r35.4.1-template1", - "extlinux.conf-r35.4.1-updated1", - "11aa-bbcc-22dd", - ), - ( - "extlinux.conf-r35.4.1-template2", - "extlinux.conf-r35.4.1-updated2", - "11aa-bbcc-22dd", - ), - ), -) -def test_update_extlinux_conf(_template_f: Path, _updated_f: Path, partuuid: str): - _in = (TEST_DATA_DIR / _template_f).read_text() - _expected = (TEST_DATA_DIR / _updated_f).read_text() - assert update_extlinux_cfg(_in, partuuid) == _expected diff --git a/tests/test_otaclient/test_boot_control/test_jetson_common.py b/tests/test_otaclient/test_boot_control/test_jetson_common.py new file mode 100644 index 000000000..884447c53 --- /dev/null +++ b/tests/test_otaclient/test_boot_control/test_jetson_common.py @@ -0,0 +1,322 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for Jetson device boot control implementation common.""" + + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest + +from otaclient.boot_control._jetson_common import ( + SLOT_A, + SLOT_B, + BSPVersion, + FirmwareBSPVersionControl, + SlotBSPVersion, + SlotID, + detect_external_rootdev, + get_nvbootctrl_conf_tnspec, + get_partition_devpath, + parse_nv_tegra_release, + update_extlinux_cfg, +) +from tests.conftest import TEST_DIR + +TEST_DATA_DIR = TEST_DIR / "data" + + +@pytest.mark.parametrize( + "_in, _expect, _exc", + ( + (SLOT_A, SLOT_A, None), + (SLOT_B, SLOT_B, None), + ("0", SLOT_A, None), + ("1", SLOT_B, None), + ("not_a_valid_slot_id", None, ValueError), + ), +) +def test_slot_id(_in: Any, _expect: SlotID | None, _exc: type[Exception] | None): + def _test(): + assert SlotID(_in) == _expect + + if _exc: + with pytest.raises(_exc): + return _test() + else: + _test() + + +class TestBSPVersion: + + @pytest.mark.parametrize( + "_in, _expect, _exc", + ( + ("R32.6.1", BSPVersion(32, 6, 1), None), + ("r32.6.1", BSPVersion(32, 6, 1), None), + ("32.6.1", BSPVersion(32, 6, 1), None), + ("R35.4.1", BSPVersion(35, 4, 1), None), + ("1.22.333", BSPVersion(1, 22, 333), None), + ("not_a_valid_bsp_ver", None, ValueError), + (123, None, ValueError), + ), + ) + def test_parse( + self, _in: Any, _expect: BSPVersion | None, _exc: type[Exception] | None + ): + def _test(): + assert BSPVersion.parse(_in) == _expect + + if _exc: + with pytest.raises(_exc): + _test() + else: + _test() + + @pytest.mark.parametrize( + "_in, _expect", + ( + (BSPVersion(35, 4, 1), "R35.4.1"), + (BSPVersion(32, 6, 1), "R32.6.1"), + (BSPVersion(1, 22, 333), "R1.22.333"), + ), + ) + def test_dump(self, _in: BSPVersion, _expect: str): + assert _in.dump() == _expect + + +class TestSlotBSPVersion: + + @pytest.mark.parametrize( + "_in, _slot, _bsp_ver, _expect", + ( + ( + SlotBSPVersion(), + SLOT_A, + BSPVersion(32, 6, 1), + SlotBSPVersion(slot_a=BSPVersion(32, 6, 1)), + ), + ( + SlotBSPVersion( + slot_a=BSPVersion(32, 5, 1), slot_b=BSPVersion(32, 6, 1) + ), + SLOT_B, + None, + SlotBSPVersion(slot_a=BSPVersion(32, 5, 1), slot_b=None), + ), + ( + SlotBSPVersion( + slot_a=BSPVersion(32, 5, 1), slot_b=BSPVersion(32, 6, 1) + ), + SLOT_A, + None, + SlotBSPVersion(slot_a=None, slot_b=BSPVersion(32, 6, 1)), + ), + ), + ) + def test_set_by_slot( + self, + _in: SlotBSPVersion, + _slot: SlotID, + _bsp_ver: BSPVersion | None, + _expect: SlotBSPVersion, + ): + _in.set_by_slot(_slot, _bsp_ver) + assert _in == _expect + + @pytest.mark.parametrize( + "_in, _slot, _expect", + ( + ( + SlotBSPVersion(), + SLOT_A, + None, + ), + ( + SlotBSPVersion( + slot_a=BSPVersion(32, 5, 1), slot_b=BSPVersion(32, 6, 1) + ), + SLOT_B, + BSPVersion(32, 6, 1), + ), + ( + SlotBSPVersion( + slot_a=BSPVersion(32, 5, 1), slot_b=BSPVersion(32, 6, 1) + ), + SLOT_A, + BSPVersion(32, 5, 1), + ), + ), + ) + def test_get_by_slot( + self, + _in: SlotBSPVersion, + _slot: SlotID, + _expect: BSPVersion | None, + ): + assert _in.get_by_slot(_slot) == _expect + + @pytest.mark.parametrize( + "_in", + ( + (SlotBSPVersion()), + (SlotBSPVersion(slot_a=BSPVersion(32, 5, 1))), + (SlotBSPVersion(slot_a=BSPVersion(35, 4, 1), slot_b=BSPVersion(35, 5, 0))), + ), + ) + def test_load_and_dump(self, _in: SlotBSPVersion): + assert SlotBSPVersion.model_validate_json(_in.model_dump_json()) == _in + + +class TestFirmwareBSPVersionControl: + + @pytest.fixture(autouse=True) + def setup_test(self, tmp_path: Path): + self.test_fw_bsp_vf = tmp_path / "firmware_bsp_version" + self.slot_b_ver = BSPVersion(35, 5, 0) + self.slot_a_ver = BSPVersion(35, 4, 1) + + def test_init(self): + self.test_fw_bsp_vf.write_text( + SlotBSPVersion(slot_b=self.slot_b_ver).model_dump_json() + ) + + loaded = FirmwareBSPVersionControl( + SLOT_A, + self.slot_a_ver, + current_bsp_version_file=self.test_fw_bsp_vf, + ) + + # NOTE: FirmwareBSPVersionControl will not use the information for current slot. + assert loaded.current_slot_bsp_ver == self.slot_a_ver + assert loaded.standby_slot_bsp_ver == self.slot_b_ver + + def test_write_to_file(self): + self.test_fw_bsp_vf.write_text( + SlotBSPVersion(slot_b=self.slot_b_ver).model_dump_json() + ) + loaded = FirmwareBSPVersionControl( + SLOT_A, + self.slot_a_ver, + current_bsp_version_file=self.test_fw_bsp_vf, + ) + loaded.write_to_file(self.test_fw_bsp_vf) + + assert ( + self.test_fw_bsp_vf.read_text() + == SlotBSPVersion( + slot_a=self.slot_a_ver, slot_b=self.slot_b_ver + ).model_dump_json() + ) + + +@pytest.mark.parametrize( + "_in, _expect", + ( + ( + "# R32 (release), REVISION: 6.1, GCID: 27863751, BOARD: t186ref, EABI: aarch64, DATE: Mon Jul 26 19:36:31 UTC 2021", + BSPVersion(32, 6, 1), + ), + ( + "# R35 (release), REVISION: 4.1, GCID: 33958178, BOARD: t186ref, EABI: aarch64, DATE: Tue Aug 1 19:57:35 UTC 2023", + BSPVersion(35, 4, 1), + ), + ( + "# R35 (release), REVISION: 5.0, GCID: 35550185, BOARD: t186ref, EABI: aarch64, DATE: Tue Feb 20 04:46:31 UTC 2024", + BSPVersion(35, 5, 0), + ), + ), +) +def test_parse_nv_tegra_release(_in: str, _expect: BSPVersion): + assert parse_nv_tegra_release(_in) == _expect + + +@pytest.mark.parametrize( + ["_template_f", "_updated_f", "partuuid"], + ( + ( + "extlinux.conf-r35.4.1-template1", + "extlinux.conf-r35.4.1-updated1", + "11aa-bbcc-22dd", + ), + ( + "extlinux.conf-r35.4.1-template2", + "extlinux.conf-r35.4.1-updated2", + "11aa-bbcc-22dd", + ), + ), +) +def test_update_extlinux_conf(_template_f: Path, _updated_f: Path, partuuid: str): + _in = (TEST_DATA_DIR / _template_f).read_text() + _expected = (TEST_DATA_DIR / _updated_f).read_text() + assert update_extlinux_cfg(_in, partuuid) == _expected + + +@pytest.mark.parametrize( + "parent_devpath, is_external_rootdev", + ( + ("/dev/mmcblk0", False), + ("/dev/mmcblk1", True), + ("/dev/sda", True), + ("/dev/nvme0n1", True), + ), +) +def test_detect_external_rootdev(parent_devpath, is_external_rootdev): + assert detect_external_rootdev(parent_devpath) is is_external_rootdev + + +@pytest.mark.parametrize( + "parent_devpath, partition_id, expected", + ( + ("/dev/mmcblk0", 1, "/dev/mmcblk0p1"), + ("/dev/mmcblk1", 1, "/dev/mmcblk1p1"), + ("/dev/sda", 1, "/dev/sda1"), + ("/dev/nvme0n1", 1, "/dev/nvme0n1p1"), + ), +) +def test_get_partition_devpath(parent_devpath, partition_id, expected): + assert get_partition_devpath(parent_devpath, partition_id) == expected + + +@pytest.mark.parametrize( + "nvbooctrl_conf, expected", + ( + ( + """\ +TNSPEC 2888-400-0004-M.0-1-2-jetson-xavier-rqx580- +TEGRA_CHIPID 0x19 +TEGRA_OTA_BOOT_DEVICE /dev/mmcblk0boot0 +TEGRA_OTA_GPT_DEVICE /dev/mmcblk0boot1 +""", + "2888-400-0004-M.0-1-2-jetson-xavier-rqx580-", + ), + ( + """\ +TNSPEC 3701-500-0005-A.0-1-0-cti-orin-agx-agx201-00- +TEGRA_CHIPID 0x23 +TEGRA_OTA_BOOT_DEVICE /dev/mtdblock0 +TEGRA_OTA_GPT_DEVICE /dev/mtdblock0 +""", + "3701-500-0005-A.0-1-0-cti-orin-agx-agx201-00-", + ), + ( + "not a nvbooctrl file", + None, + ), + ), +) +def test_get_nvbootctrl_conf_tnspec(nvbooctrl_conf, expected): + assert get_nvbootctrl_conf_tnspec(nvbooctrl_conf) == expected diff --git a/tests/test_otaclient/test_boot_control/test_jetson_uefi.py b/tests/test_otaclient/test_boot_control/test_jetson_uefi.py new file mode 100644 index 000000000..2816bb6aa --- /dev/null +++ b/tests/test_otaclient/test_boot_control/test_jetson_uefi.py @@ -0,0 +1,307 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from pathlib import Path + +import pytest +from pytest_mock import MockerFixture + +from otaclient.boot_control import _jetson_uefi +from otaclient.boot_control._jetson_common import ( + BSPVersion, + NVBootctrlExecError, + SlotID, +) +from otaclient.boot_control._jetson_uefi import ( + JetsonUEFIBootControlError, + L4TLauncherBSPVersionControl, + NVBootctrlJetsonUEFI, + _detect_esp_dev, + _detect_ota_bootdev_is_qspi, + _l4tlauncher_version_control, +) + +MODULE = _jetson_uefi.__name__ + + +class TestNVBootctrlJetsonUEFI: + + @pytest.mark.parametrize( + "_input, expected", + ( + ( + """\ +Current version: 35.4.1 +Capsule update status: 1 +Current bootloader slot: A +Active bootloader slot: A +num_slots: 2 +slot: 0, status: normal +slot: 1, status: normal + """, + BSPVersion.parse("35.4.1"), + ), + ( + """\ +Current version: 36.3.0 +Capsule update status: 0 +Current bootloader slot: A +Active bootloader slot: A +num_slots: 2 +slot: 0, status: normal +slot: 1, status: normal + """, + BSPVersion.parse("36.3.0"), + ), + ( + """\ +Current version: 0.0.0 +Current bootloader slot: A +Active bootloader slot: A +num_slots: 2 +slot: 0, status: normal +slot: 1, status: normal + """, + NVBootctrlExecError(), + ), + ), + ) + def test_get_current_fw_bsp_version( + self, _input: str, expected: BSPVersion | Exception, mocker: MockerFixture + ): + mocker.patch.object( + NVBootctrlJetsonUEFI, + "dump_slots_info", + mocker.MagicMock(return_value=_input), + ) + + if isinstance(expected, Exception): + with pytest.raises(type(expected)): + NVBootctrlJetsonUEFI.get_current_fw_bsp_version() + else: + assert NVBootctrlJetsonUEFI.get_current_fw_bsp_version() == expected + + @pytest.mark.parametrize( + "_input, expected", + ( + ( + """\ +Current version: 35.4.1 +Capsule update status: 1 +Current bootloader slot: A +Active bootloader slot: A +num_slots: 2 +slot: 0, status: normal +slot: 1, status: normal + """, + SlotID("0"), + ), + ( + """\ +Current version: 35.4.1 +Capsule update status: 1 +Current bootloader slot: A +Active bootloader slot: B +num_slots: 2 +slot: 0, status: normal +slot: 1, status: normal + """, + SlotID("1"), + ), + ), + ) + def test_get_active_bootloader_slot( + self, _input: str, expected: SlotID, mocker: MockerFixture + ): + mocker.patch.object( + NVBootctrlJetsonUEFI, + "dump_slots_info", + mocker.MagicMock(return_value=_input), + ) + assert NVBootctrlJetsonUEFI.get_active_bootloader_slot() == expected + + +@pytest.mark.parametrize( + "all_esp_devs, boot_parent_devpath, expected", + ( + ( + ["/dev/mmcblk0p41", "/dev/nvme0n1p41"], + "/dev/nvme0n1", + "/dev/nvme0n1p41", + ), + ( + ["/dev/mmcblk0p41"], + "/dev/mmcblk0", + "/dev/mmcblk0p41", + ), + ( + ["/dev/sda23", "/dev/mmcblk0p41"], + "/dev/nvme0n1", + JetsonUEFIBootControlError(), + ), + ( + [], + "/dev/nvme0n1", + JetsonUEFIBootControlError(), + ), + ), +) +def test__detect_esp_dev( + all_esp_devs, boot_parent_devpath, expected, mocker: MockerFixture +): + mocker.patch( + f"{MODULE}.CMDHelperFuncs.get_dev_by_token", + mocker.MagicMock(return_value=all_esp_devs), + ) + + if isinstance(expected, Exception): + with pytest.raises(type(expected)): + _detect_esp_dev(boot_parent_devpath) + else: + assert _detect_esp_dev(boot_parent_devpath) == expected + + +@pytest.mark.parametrize( + "nvbootctrl_conf, expected", + ( + ( + """\ +TNSPEC 3701-500-0005-A.0-1-0-cti-orin-agx-agx201-00- +TEGRA_CHIPID 0x23 +TEGRA_OTA_BOOT_DEVICE /dev/mtdblock0 +TEGRA_OTA_GPT_DEVICE /dev/mtdblock0 +""", + True, + ), + ( + """\ +TNSPEC 2888-400-0004-M.0-1-2-jetson-xavier-rqx580- +TEGRA_CHIPID 0x19 +TEGRA_OTA_BOOT_DEVICE /dev/mmcblk0boot0 +TEGRA_OTA_GPT_DEVICE /dev/mmcblk0boot1 +""", + False, + ), + ("", None), + ), +) +def test__detect_ota_bootdev_is_qspi(nvbootctrl_conf, expected): + assert _detect_ota_bootdev_is_qspi(nvbootctrl_conf) is expected + + +class TestL4TLauncherBSPVersionControl: + + @pytest.mark.parametrize( + "_in, expected", + _test_case := ( + ( + "R35.4.1:ac17457772666351154a5952e3b87851a6398da2afcf3a38bedfc0925760bb0e", + L4TLauncherBSPVersionControl( + bsp_ver=BSPVersion(35, 4, 1), + sha256_digest="ac17457772666351154a5952e3b87851a6398da2afcf3a38bedfc0925760bb0e", + ), + ), + ( + "R35.5.0:3928e0feb84e37db87dc6705a02eec95a6fca2dccd3467b61fa66ed8e1046b67", + L4TLauncherBSPVersionControl( + bsp_ver=BSPVersion(35, 5, 0), + sha256_digest="3928e0feb84e37db87dc6705a02eec95a6fca2dccd3467b61fa66ed8e1046b67", + ), + ), + ( + "R36.3.0:b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + L4TLauncherBSPVersionControl( + bsp_ver=BSPVersion(36, 3, 0), + sha256_digest="b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + ), + ), + ), + ) + def test_parse(self, _in, expected): + assert L4TLauncherBSPVersionControl.parse(_in) == expected + + @pytest.mark.parametrize( + "to_be_dumped, expected", + tuple((entry[1], entry[0]) for entry in _test_case), + ) + def test_dump(self, to_be_dumped: L4TLauncherBSPVersionControl, expected: str): + assert to_be_dumped.dump() == expected + + +@pytest.mark.parametrize( + "ver_ctrl, l4tlauncher_digest, current_bsp_ver, expected_bsp_ver, expected_ver_ctrl", + ( + # valid version file, hash matched + ( + "R36.3.0:b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + "b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + BSPVersion(1, 2, 3), + BSPVersion(36, 3, 0), + "R36.3.0:b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + ), + # no version file, lookup table hit + ( + "invalid_version_file", + "b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + BSPVersion(1, 2, 3), + BSPVersion(36, 3, 0), + "R36.3.0:b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + ), + # valid version file, hash mismatched, use slot BSP version + ( + "R36.3.0:b14fa3623f4078d05573d9dcf2a0b46ea2ae07d6b75d9843f9da6ff24db13718", + "hash_mismatched", + BSPVersion(1, 2, 3), + BSPVersion(1, 2, 3), + "R1.2.3:hash_mismatched", + ), + # no version file, lookup table unhit + ( + "invalid_version_file", + "not_recorded_hash", + BSPVersion(1, 2, 3), + BSPVersion(1, 2, 3), + "R1.2.3:not_recorded_hash", + ), + ), +) +def test__l4tlauncher_version_control( + ver_ctrl, + l4tlauncher_digest, + current_bsp_ver, + expected_bsp_ver, + expected_ver_ctrl, + tmp_path: Path, + mocker: MockerFixture, +): + ver_control_f = tmp_path / "l4tlauncher_ver_control" + ver_control_f.write_text(ver_ctrl) + + mocker.patch( + f"{MODULE}.file_sha256", mocker.MagicMock(return_value=l4tlauncher_digest) + ) + + assert ( + _l4tlauncher_version_control( + ver_control_f, + "any", + current_slot_bsp_ver=current_bsp_ver, + ) + == expected_bsp_ver + ) + # ensure the version control file is expected + assert ver_control_f.read_text() == expected_ver_ctrl diff --git a/tests/test_otaclient_common/test_common.py b/tests/test_otaclient_common/test_common.py index 28589fe22..2d8765c9b 100644 --- a/tests/test_otaclient_common/test_common.py +++ b/tests/test_otaclient_common/test_common.py @@ -26,6 +26,7 @@ import pytest +from otaclient_common import replace_root from otaclient_common.common import ( copytree_identical, ensure_otaproxy_start, @@ -347,3 +348,24 @@ def test_subprocess_check_output_succeeded(self): output = subprocess_check_output(cmd, raise_exception=True) assert output == self.TEST_FILE_CONTENTS + + +@pytest.mark.parametrize( + "path, old_root, new_root, expected", + ( + ( + "/a/canonical/fpath", + "/", + "/mnt/standby_mp", + "/mnt/standby_mp/a/canonical/fpath", + ), + ( + "/a/canonical/dpath/", + "/", + "/mnt/standby_mp/", + "/mnt/standby_mp/a/canonical/dpath/", + ), + ), +) +def get_replace_root(path, old_root, new_root, expected): + assert replace_root(path, old_root, new_root) == expected