From 134706281992723207cade58009ec0a3051e7c64 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 9 May 2024 15:39:47 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- otaclient/_utils/__init__.py | 9 ++- otaclient/_utils/linux.py | 5 ++ otaclient/_utils/typing.py | 7 +- otaclient/app/boot_control/_common.py | 47 ++++++++---- otaclient/app/boot_control/_grub.py | 75 ++++++++++++------- otaclient/app/boot_control/_jetson_cboot.py | 24 ++++-- otaclient/app/boot_control/_rpi_boot.py | 22 ++++-- otaclient/app/boot_control/configs.py | 3 +- otaclient/app/boot_control/protocol.py | 21 +++--- otaclient/app/boot_control/selecter.py | 1 + otaclient/app/common.py | 25 +++++-- otaclient/app/configs.py | 1 + otaclient/app/create_standby/common.py | 5 +- otaclient/app/create_standby/interface.py | 8 +- otaclient/app/create_standby/rebuild_mode.py | 1 + otaclient/app/downloader.py | 30 +++++--- otaclient/app/errors.py | 10 ++- otaclient/app/log_setting.py | 3 +- otaclient/app/ota_client.py | 15 ++-- otaclient/app/ota_client_stub.py | 37 ++++++--- otaclient/app/ota_metadata.py | 19 +++-- otaclient/app/proto/__init__.py | 3 +- otaclient/app/proto/_common.py | 46 +++++++----- otaclient/app/proto/_ota_metafiles_wrapper.py | 6 +- .../app/proto/_otaclient_v2_pb2_wrapper.py | 6 +- otaclient/app/proto/otaclient_v2_pb2_grpc.py | 21 ++++-- otaclient/app/proto/streamer.py | 3 +- otaclient/app/update_stats.py | 2 + otaclient/configs/__init__.py | 2 +- otaclient/configs/ecu_info.py | 2 + otaclient/configs/proxy_info.py | 9 ++- otaclient/ota_proxy/__init__.py | 9 ++- otaclient/ota_proxy/cache_control.py | 2 + otaclient/ota_proxy/db.py | 23 ++++-- otaclient/ota_proxy/orm.py | 20 +++-- otaclient/ota_proxy/ota_cache.py | 41 +++++++--- otaclient/ota_proxy/server_app.py | 22 ++++-- pyproject.toml | 11 ++- tests/conftest.py | 1 + tests/test_boot_control/test_rpi_boot.py | 5 +- tests/test_ota_client_stub.py | 9 +-- tests/test_ota_proxy/test_cachedb.py | 9 +-- tests/test_ota_proxy/test_ota_cache.py | 12 +-- tests/test_persist_file_handling.py | 1 - .../test_proto/test_otaclient_pb2_wrapper.py | 8 +- tests/test_update_stats.py | 7 +- tools/offline_ota_image_builder/__main__.py | 1 + tools/offline_ota_image_builder/builder.py | 3 +- tools/offline_ota_image_builder/manifest.py | 3 +- tools/status_monitor/ecu_status_box.py | 15 ++-- tools/status_monitor/utils.py | 4 +- 51 files changed, 440 insertions(+), 234 deletions(-) diff --git a/otaclient/_utils/__init__.py b/otaclient/_utils/__init__.py index dd75b77f2..9d91ae84f 100644 --- a/otaclient/_utils/__init__.py +++ b/otaclient/_utils/__init__.py @@ -29,9 +29,11 @@ def copy_callable_typehint(_source: Callable[P, Any]): """This helper function return a decorator that can type hint the target function as the _source function. - At runtime, this decorator actually does nothing, but just return the input function as it. - But the returned function will have the same type hint as the source function in ide. - It will not impact the runtime behavior of the decorated function. + At runtime, this decorator actually does nothing, but just return + the input function as it. But the returned function will have the + same type hint as the source function in ide. It will not impact the + runtime behavior of the decorated function. + """ def _decorator(target) -> Callable[P, Any]: @@ -76,6 +78,7 @@ def replace_root(path: str | Path, old_root: str | Path, new_root: str | Path) - For example, if path="/abc", old_root="/", new_root="/new_root", then we will have "/new_root/abc". + """ # normalize all the input args path = os.path.normpath(path) diff --git a/otaclient/_utils/linux.py b/otaclient/_utils/linux.py index 4c94596d9..52091ed8f 100644 --- a/otaclient/_utils/linux.py +++ b/otaclient/_utils/linux.py @@ -41,6 +41,7 @@ def create_swapfile( Raises: ValueError on file already exists at , SubprocessCallFailed on failed swapfile creation. + """ swapfile_fpath = Path(swapfile_fpath) if swapfile_fpath.exists(): @@ -86,6 +87,7 @@ class ParsedPasswd: Attrs: _by_name (dict[str, int]): name:uid mapping. _by_uid (dict[int, str]): uid:name mapping. + """ __slots__ = ["_by_name", "_by_uid"] @@ -112,6 +114,7 @@ class ParsedGroup: Attrs: _by_name (dict[str, int]): name:gid mapping. _by_gid (dict[int, str]): gid:name mapping. + """ __slots__ = ["_by_name", "_by_gid"] @@ -133,6 +136,7 @@ def map_uid_by_pwnam(*, src_db: ParsedPasswd, dst_db: ParsedPasswd, uid: int) -> Raises: ValueError on failed mapping. + """ try: return dst_db._by_name[src_db._by_uid[uid]] @@ -145,6 +149,7 @@ def map_gid_by_grpnam(*, src_db: ParsedGroup, dst_db: ParsedGroup, gid: int) -> Raises: ValueError on failed mapping. + """ try: return dst_db._by_name[src_db._by_gid[gid]] diff --git a/otaclient/_utils/typing.py b/otaclient/_utils/typing.py index 451ce4f19..5616c2840 100644 --- a/otaclient/_utils/typing.py +++ b/otaclient/_utils/typing.py @@ -33,11 +33,12 @@ def gen_strenum_validator(enum_type: type[T]) -> Callable[[T | str | Any], T]: - """A before validator generator that converts input value into enum - before passing it to pydantic validator. + """A before validator generator that converts input value into enum before + passing it to pydantic validator. NOTE(20240129): as upto pydantic v2.5.3, (str, Enum) field cannot - pass strict validation if input is str. + pass strict validation if input is str. + """ def _inner(value: T | str | Any) -> T: diff --git a/otaclient/app/boot_control/_common.py b/otaclient/app/boot_control/_common.py index 253a8342c..c2486fe9e 100644 --- a/otaclient/app/boot_control/_common.py +++ b/otaclient/app/boot_control/_common.py @@ -42,9 +42,10 @@ class CMDHelperFuncs: """HelperFuncs bundle for wrapped linux cmd. - When underlying subprocess call failed and is True, - functions defined in this class will raise the original exception - to the upper caller. + When underlying subprocess call failed and is + True, functions defined in this class will raise the original + exception to the upper caller. + """ @classmethod @@ -64,6 +65,7 @@ def get_attrs_by_dev( Returns: str: of . + """ cmd = ["lsblk", "-ino", attr, str(dev)] return subprocess_check_output(cmd, raise_exception=raise_exception) @@ -86,6 +88,7 @@ def get_dev_by_token( Returns: Optional[list[str]]: If there is at least one device found, return a list contains all found device(s), otherwise None. + """ cmd = ["blkid", "-o", "device", "-t", f"{token}={value}"] if res := subprocess_check_output(cmd, raise_exception=raise_exception): @@ -104,6 +107,7 @@ def get_current_rootfs_dev(cls, *, raise_exception: bool = True) -> str: Returns: str: the devpath of current rootfs device. + """ cmd = ["findmnt", "-nfco", "SOURCE", cfg.ACTIVE_ROOTFS_PATH] return subprocess_check_output(cmd, raise_exception=raise_exception) @@ -125,6 +129,7 @@ def get_mount_point_by_dev(cls, dev: str, *, raise_exception: bool = True) -> st Returns: str: the FIRST mountpint of the , or empty string if is False and the subprocess call failed(due to dev is not mounted or other reasons). + """ cmd = ["findmnt", "-nfo", "TARGET", dev] return subprocess_check_output(cmd, raise_exception=raise_exception) @@ -145,6 +150,7 @@ def get_dev_by_mount_point( Returns: str: the source device of . + """ cmd = ["findmnt", "-no", "SOURCE", mount_point] return subprocess_check_output(cmd, raise_exception=raise_exception) @@ -153,7 +159,8 @@ def get_dev_by_mount_point( def is_target_mounted( cls, target: Path | str, *, raise_exception: bool = True ) -> bool: - """Check if is mounted or not. can be a dev or a mount point. + """Check if is mounted or not. can be a dev or a + mount point. This is implemented by calling: findmnt @@ -165,6 +172,7 @@ def is_target_mounted( Returns: bool: return True if the target has at least one mount_point. + """ cmd = ["findmnt", target] return bool(subprocess_check_output(cmd, raise_exception=raise_exception)) @@ -185,6 +193,7 @@ def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> s Returns: str: the parent device of the specific . + """ cmd = ["lsblk", "-idpno", "PKNAME", child_device] return subprocess_check_output(cmd, raise_exception=raise_exception) @@ -201,6 +210,7 @@ def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = Tru fslabel (str): the fslabel to be set. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ cmd = ["e2label", dev, fslabel] subprocess_call(cmd, raise_exception=raise_exception) @@ -222,6 +232,7 @@ def mount_rw( mount_point (Path | str): mount point to mount to. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ # fmt: off cmd = [ @@ -248,6 +259,7 @@ def bind_mount_ro( mount_point (Path | str): mount point to mount to. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ # fmt: off cmd = [ @@ -274,6 +286,7 @@ def umount(cls, target: Path | str, *, raise_exception: bool = True): target (Path | str): target to be umounted. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ # first try to check whether the target(either a mount point or a dev) # is mounted @@ -293,8 +306,8 @@ def mkfs_ext4( fsuuid: Optional[str] = None, raise_exception: bool = True, ): - """Create new ext4 formatted filesystem on , optionally with - and/or . + """Create new ext4 formatted filesystem on , optionally with + and/or . Args: dev (str): device to be formatted to ext4. @@ -304,6 +317,7 @@ def mkfs_ext4( When it is None, this function will try to preserve the previous fsuuid. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ cmd = ["mkfs.ext4", "-F"] @@ -347,6 +361,7 @@ def mount_ro( mount_point (str | Path): mount point to mount to. raise_exception (bool, optional): raise exception on subprocess call failed. Defaults to True. + """ # NOTE: set raise_exception to false to allow not mounted # not mounted dev will have empty return str @@ -384,6 +399,7 @@ def reboot(cls, args: Optional[list[str]] = None) -> NoReturn: Args: args (Optional[list[str]], optional): args passed to reboot command. Defaults to None, not passing any args. + """ cmd = ["reboot"] if args: @@ -410,6 +426,7 @@ class OTAStatusFilesControl: If status or slot_in_use file is missing, initialization is required. status will be set to INITIALIZED, and slot_in_use will be set to current slot. + """ def __init__( @@ -570,7 +587,8 @@ def _store_standby_version(self, _version: str): # helper methods def _is_switching_boot(self, active_slot: str) -> bool: - """Detect whether we should switch boot or not with ota_status files.""" + """Detect whether we should switch boot or not with ota_status + files.""" # evidence: ota_status _is_updating_or_rollbacking = self._load_current_status() in [ wrapper.StatusOta.UPDATING, @@ -592,16 +610,17 @@ def _is_switching_boot(self, active_slot: str) -> bool: # boot control used methods def pre_update_current(self): - """On pre_update stage, set current slot's status to FAILURE - and set slot_in_use to standby slot.""" + """On pre_update stage, set current slot's status to FAILURE and set + slot_in_use to standby slot.""" self._store_current_status(wrapper.StatusOta.FAILURE) self._store_current_slot_in_use(self.standby_slot) def pre_update_standby(self, *, version: str): - """On pre_update stage, set standby slot's status to UPDATING, - set slot_in_use to standby slot, and set version. + """On pre_update stage, set standby slot's status to UPDATING, set + slot_in_use to standby slot, and set version. NOTE: expecting standby slot to be mounted and ready for use! + """ # create the ota-status folder unconditionally self.standby_ota_status_dir.mkdir(exist_ok=True, parents=True) @@ -640,6 +659,7 @@ def booted_ota_status(self) -> wrapper.StatusOta: This property is only meant to be used once when otaclient starts up, switch to use live_ota_status by otaclient after otaclient is running. + """ return self._ota_status @@ -696,8 +716,9 @@ def mount_active(self) -> None: def preserve_ota_folder_to_standby(self): """Copy the /boot/ota folder to standby slot to preserve it. - /boot/ota folder contains the ota setting for this device, - so we should preserve it for each slot, accross each update. + /boot/ota folder contains the ota setting for this device, so we + should preserve it for each slot, accross each update. + """ logger.debug("copy /boot/ota from active to standby.") try: diff --git a/otaclient/app/boot_control/_grub.py b/otaclient/app/boot_control/_grub.py index a927241c2..0ac03eb9e 100644 --- a/otaclient/app/boot_control/_grub.py +++ b/otaclient/app/boot_control/_grub.py @@ -13,21 +13,25 @@ # limitations under the License. """Implementation of grub boot control. -DEPRECATION WARNING(20231026): - Current mechanism of defining and detecting slots is proved to be not robust. - The design expects that rootfs device will always be sda, which might not be guaranteed - as the sdx naming scheme is based on the order of kernel recognizing block devices. - If rootfs somehow is not named as sda, the grub boot controller will fail to identifiy - the slots and/or finding corresponding ota-partition files, finally failing the OTA. - Also slots are detected by assuming the partition layout, which is less robust comparing to - cboot and rpi_boot implementation of boot controller. +DEPRECATION WARNING(20231026): Current mechanism of defining and +detecting slots is proved to be not robust. The design expects that +rootfs device will always be sda, which might not be guaranteed +as the sdx naming scheme is based on the order of kernel recognizing +block devices. If rootfs somehow is not named as sda, the grub +boot controller will fail to identifiy the slots and/or finding +corresponding ota-partition files, finally failing the OTA. Also +slots are detected by assuming the partition layout, which is less +robust comparing to cboot and rpi_boot implementation of boot +controller. TODO(20231026): design new mechanism to define and manage slot. -NOTE(20231027) A workaround fix is applied to handle the edge case of rootfs not named as sda, - Check GrubABPartitionDetector class for more details. - This workaround only means to avoid OTA failed on edge condition and maintain backward compatibility, - still expecting new mechanism to fundamentally resolve this issue. +NOTE(20231027) A workaround fix is applied to handle the edge case of +rootfs not named as sda, Check GrubABPartitionDetector class for +more details. This workaround only means to avoid OTA failed on edge +condition and maintain backward compatibility, still expecting new +mechanism to fundamentally resolve this issue. + """ @@ -142,14 +146,15 @@ def update_entry_rootfs( rootfs_str: str, start: int = 0, ) -> Optional[str]: - """Read in grub_cfg, update matched kernel entries' rootfs with , - and then return the updated one. + """Read in grub_cfg, update matched kernel entries' rootfs with + , and then return the updated one. Params: grub_cfg: input grub_cfg str kernel_ver: kernel version str for the target entry rootfs_str: a str that indicates which rootfs device to use, like root=UUID= + """ new_entry_block: Optional[str] = None entry_l, entry_r = None, None @@ -198,6 +203,7 @@ def get_entry(cls, grub_cfg: str, *, kernel_ver: str) -> Tuple[int, _GrubMenuEnt which is correct in most cases(recovery entry will always be after the normal boot entry, and we by defautl disable recovery entry). + """ for index, entry_ma in enumerate(cls.menuentry_pa.finditer(grub_cfg)): if _linux := cls.linux_pa.search(entry_ma.group()): @@ -218,6 +224,7 @@ def update_grub_default( 2. option that specified multiple times will be merged into one, and the latest specified value will be used, or predefined default value will be used if such value defined. + """ default_kvp = cls.grub_default_options.copy() if default_entry_idx is not None: @@ -292,6 +299,7 @@ class GrubABPartitionDetector: NOTE(20231027): as workaround to rootfs not sda breaking OTA, slot naming schema is fixed to "sda", and ota-partition folder is searched with this name. For example, if current slot's device is nvme0n1p3, the slot_name is sda3. + """ # assuming that the suffix digit are the partiton id, for example, @@ -349,6 +357,7 @@ def _detect_active_slot(self) -> Tuple[str, str]: Returns: A tuple contains the slot_name and the full dev path of the active slot. + """ try: dev_path = CMDHelperFuncs.get_current_rootfs_dev() @@ -371,6 +380,7 @@ def _detect_standby_slot(self, active_dev: str) -> Tuple[str, str]: Returns: A tuple contains the slot_name and the full dev path of the standby slot. + """ dev_path = self._get_sibling_dev(active_dev) _dev_path_ma = self.DEV_PATH_PA.match(dev_path) @@ -415,11 +425,12 @@ def __init__(self) -> None: @property def initialized(self) -> bool: - """Indicates whether grub_control migrates itself from non-OTA booted system, - or recovered from a ota_partition files corrupted boot. + """Indicates whether grub_control migrates itself from non-OTA booted + system, or recovered from a ota_partition files corrupted boot. + + Normally this property should be false, if it is true, + OTAStatusControl should also initialize itself. - Normally this property should be false, if it is true, OTAStatusControl should also - initialize itself. """ return self._grub_control_initialized @@ -434,6 +445,7 @@ def _check_active_slot_ota_partition_file(self): 1. this method only update the ota-partition./grub.cfg! 2. standby slot is not considered here! 3. expected booted kernel/initrd located under /boot. + """ # ------ check boot files ------ # vmlinuz_active_slot = self.active_ota_partition_folder / GrubHelper.KERNEL_OTA @@ -507,6 +519,7 @@ def _get_current_booted_files() -> Tuple[str, str]: """Return the name of booted kernel and initrd. Expected booted kernel and initrd are located under /boot. + """ boot_cmdline = cat_proc_cmdline() if kernel_ma := re.search( @@ -526,9 +539,11 @@ def _get_current_booted_files() -> Tuple[str, str]: @staticmethod def _prepare_kernel_initrd_links(target_folder: Path): - """Prepare OTA symlinks for kernel/initrd under specific ota-partition folder. - vmlinuz-ota -> vmlinuz-* - initrd-ota -> initrd-* + """Prepare OTA symlinks for kernel/initrd under specific ota-partition + folder. + + vmlinuz-ota -> vmlinuz-* initrd-ota -> initrd-* + """ kernel, initrd = None, None # NOTE(20230914): if multiple kernels presented, the first found @@ -559,6 +574,7 @@ def _grub_update_on_booted_slot(self, *, abort_on_standby_missed=True): NOTE: 1. this method only ensures the entry existence for current booted slot. 2. this method ensures the default entry to be the current booted slot. + """ grub_default_file = Path(cfg.ACTIVE_ROOTFS_PATH) / Path( cfg.DEFAULT_GRUB_PATH @@ -650,8 +666,8 @@ def _grub_update_on_booted_slot(self, *, abort_on_standby_missed=True): logger.info(f"update_grub for {self.active_slot} finished.") def _ensure_ota_partition_symlinks(self, active_slot: str): - """Ensure /boot/{ota_partition,vmlinuz-ota,initrd.img-ota} symlinks from - specified point's of view.""" + """Ensure /boot/{ota_partition,vmlinuz-ota,initrd.img-ota} symlinks + from specified point's of view.""" ota_partition_folder = Path(cfg.BOOT_OTA_PARTITION_FILE) # ota-partition re_symlink_atomic( # /boot/ota-partition -> ota-partition. self.boot_dir / ota_partition_folder, @@ -705,7 +721,8 @@ def finalize_update_switch_boot(self): raise _GrubBootControllerError(_err_msg) from e def grub_reboot_to_standby(self): - """Temporarily boot to standby slot after OTA applied to standby slot.""" + """Temporarily boot to standby slot after OTA applied to standby + slot.""" # ensure all required symlinks for standby slot are presented and valid try: self._prepare_kernel_initrd_links(self.standby_ota_partition_folder) @@ -759,9 +776,12 @@ def __init__(self) -> None: raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e def _update_fstab(self, *, active_slot_fstab: Path, standby_slot_fstab: Path): - """Update standby fstab based on active slot's fstab and just installed new stanby fstab. + """Update standby fstab based on active slot's fstab and just installed + new stanby fstab. + + Override existed entries in standby fstab, merge new entries + from active fstab. - Override existed entries in standby fstab, merge new entries from active fstab. """ try: standby_uuid = CMDHelperFuncs.get_attrs_by_dev( @@ -840,7 +860,8 @@ def _cleanup_standby_ota_partition_folder(self): f.unlink(missing_ok=True) def _copy_boot_files_from_standby_slot(self): - """Copy boot files under /boot to standby ota-partition folder.""" + """Copy boot files under /boot to standby ota- + partition folder.""" standby_ota_partition_dir = self._ota_status_control.standby_ota_status_dir for f in self._mp_control.standby_boot_dir.iterdir(): if f.is_file() and not f.is_symlink(): diff --git a/otaclient/app/boot_control/_jetson_cboot.py b/otaclient/app/boot_control/_jetson_cboot.py index c857f7999..e110b92ba 100644 --- a/otaclient/app/boot_control/_jetson_cboot.py +++ b/otaclient/app/boot_control/_jetson_cboot.py @@ -99,6 +99,7 @@ class _NVBootctrl: """Helper for calling nvbootctrl commands. Without -t option, the target will be bootloader by default. + """ NVBOOTCTRL = "nvbootctrl" @@ -149,6 +150,7 @@ def get_standby_slot(cls, *, target: Optional[NVBootctrlTarget] = None) -> SlotI """Prints standby SLOT. NOTE: this method is implemented with nvbootctrl get-current-slot. + """ current_slot = cls.get_current_slot(target=target) return SlotID("0") if current_slot == "1" else SlotID("1") @@ -190,6 +192,7 @@ def is_unified_enabled(cls) -> bool | None: True for both unified A/B and rootfs A/B are enbaled, False for unified A/B disabled but rootfs A/B enabled, None for both disabled. + """ cmd = "is-unified-enabled" try: @@ -262,6 +265,7 @@ def verify_update(cls) -> subprocess.CompletedProcess[bytes]: Returns: A CompletedProcess object with the call result. + """ cmd = [cls.NV_UPDATE_ENGINE, "--verify"] return subprocess_run_wrapper(cmd, check=False, check_output=True) @@ -320,6 +324,7 @@ def parse_bsp_version(nv_tegra_release: str) -> BSPVersion: """Get current BSP version from contents of /etc/nv_tegra_release. see https://developer.nvidia.com/embedded/jetson-linux-archive for BSP version history. + """ ma = BSP_VER_PA.match(nv_tegra_release) assert ma, f"invalid nv_tegra_release content: {nv_tegra_release}" @@ -472,6 +477,7 @@ def external_rootfs_enabled(self) -> bool: NOTE: distiguish from boot from external storage, as R32.5 and below doesn't support native NVMe boot. + """ return self._external_rootfs @@ -552,11 +558,14 @@ def __init__(self) -> None: raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e def _finalize_switching_boot(self) -> bool: - """ - If firmware update failed(updated bootloader slot boot failed), clear the according slot's - firmware_bsp_version information to force firmware update in next OTA. - Also if unified A/B is NOT enabled and everything is alright, execute mark-boot-success - to mark the current booted rootfs boots successfully. + """If firmware update failed(updated bootloader slot boot failed), + clear the according slot's firmware_bsp_version information to force + firmware update in next OTA. + + Also if unified A/B is NOT enabled and everything is alright, + execute mark-boot-success to mark the current + booted rootfs boots successfully. + """ current_boot_slot = self._cboot_control.current_bootloader_slot current_rootfs_slot = self._cboot_control.current_rootfs_slot @@ -595,6 +604,7 @@ def _copy_standby_slot_boot_to_internal_emmc(self): WARNING: DO NOT call this method if we are not booted from external rootfs! NOTE: at the time this method is called, the /boot folder at standby slot rootfs MUST be fully setup! + """ # mount corresponding internal emmc device internal_emmc_mp = Path(cfg.SEPARATE_BOOT_MOUNT_POINT) @@ -637,7 +647,8 @@ def _preserve_ota_config_files_to_standby(self): copytree_identical(src, dst) def _update_standby_slot_extlinux_cfg(self): - """update standby slot's /boot/extlinux/extlinux.conf to update root indicator.""" + """Update standby slot's /boot/extlinux/extlinux.conf to update root + indicator.""" src = standby_slot_extlinux = self._mp_control.standby_slot_mount_point / Path( cfg.EXTLINUX_FILE ).relative_to("/") @@ -661,6 +672,7 @@ def _nv_firmware_update(self) -> Optional[bool]: Returns: True if firmware update applied, False for failed firmware update, None for no firmware update occurs. + """ logger.info("jetson-cboot: entering nv firmware update ...") standby_bootloader_slot = self._cboot_control.standby_bootloader_slot diff --git a/otaclient/app/boot_control/_rpi_boot.py b/otaclient/app/boot_control/_rpi_boot.py index 6f596624c..4a8a9e929 100644 --- a/otaclient/app/boot_control/_rpi_boot.py +++ b/otaclient/app/boot_control/_rpi_boot.py @@ -62,6 +62,7 @@ class _RPIBootControl: Boot files for each slot have the following naming format: _ i.e., config.txt for slot_a will be config.txt_slot_a + """ SLOT_A = cfg.SLOT_A_FSLABEL @@ -158,6 +159,7 @@ def _init_boot_files(self): If any of the required files are missing, BootControlInitError will be raised. In such case, a reinstall/setup of AB partition boot files is required. + """ logger.debug("checking boot files...") # boot file @@ -212,9 +214,8 @@ def _init_boot_files(self): ) def _update_firmware(self): - """Call flash-kernel to install new dtb files, boot firmwares and kernel, initrd.img - from current rootfs to system-boot partition. - """ + """Call flash-kernel to install new dtb files, boot firmwares and + kernel, initrd.img from current rootfs to system-boot partition.""" logger.info("update firmware with flash-kernel...") try: subprocess_call("flash-kernel", raise_exception=True) @@ -260,7 +261,8 @@ def active_slot_dev(self) -> str: return self._active_slot_dev def finalize_switching_boot(self) -> bool: - """Finalize switching boot by swapping config.txt and tryboot.txt if we should. + """Finalize switching boot by swapping config.txt and tryboot.txt if we + should. Finalize switch boot: 1. atomically replace tryboot.txt with tryboot.txt_standby_slot @@ -279,6 +281,7 @@ def finalize_switching_boot(self) -> bool: Returns: A bool indicates whether the switch boot succeeded or not. Note that no exception will be raised if finalizing failed. + """ logger.info("finalizing switch boot...") try: @@ -388,12 +391,14 @@ def __init__(self) -> None: raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e def _copy_kernel_for_standby_slot(self): - """Copy the kernel and initrd_img files from current slot /boot - to system-boot for standby slot. + """Copy the kernel and initrd_img files from current slot /boot to + system-boot for standby slot. - This method will checkout the vmlinuz- and initrd.img- - under /boot, and copy them to /boot/firmware(system-boot partition) under the name + This method will checkout the vmlinuz- and + initrd.img- under /boot, and copy them to + /boot/firmware(system-boot partition) under the name vmlinuz_ and initrd.img_. + """ logger.debug( "prepare standby slot's kernel/initrd.img to system-boot partition..." @@ -439,6 +444,7 @@ def _write_standby_fstab(self): The fstab file will contain 2 lines, one line for mounting rootfs, another line is for mounting system-boot partition. NOTE: slot id is the fslabel for slot's rootfs + """ logger.debug("update standby slot fstab file...") try: diff --git a/otaclient/app/boot_control/configs.py b/otaclient/app/boot_control/configs.py index d44862d56..a1c328c06 100644 --- a/otaclient/app/boot_control/configs.py +++ b/otaclient/app/boot_control/configs.py @@ -25,7 +25,7 @@ @dataclass class GrubControlConfig(BaseConfig): - """x86-64 platform, with grub as bootloader.""" + """X86-64 platform, with grub as bootloader.""" BOOTLOADER: BootloaderType = BootloaderType.GRUB FSTAB_FILE_PATH: str = "/etc/fstab" @@ -41,6 +41,7 @@ class JetsonCBootControlConfig(BaseConfig): """Jetson device booted with cboot. Suuports BSP version < R34. + """ BOOTLOADER: BootloaderType = BootloaderType.CBOOT diff --git a/otaclient/app/boot_control/protocol.py b/otaclient/app/boot_control/protocol.py index 9883aee47..db32e9777 100644 --- a/otaclient/app/boot_control/protocol.py +++ b/otaclient/app/boot_control/protocol.py @@ -25,10 +25,12 @@ class BootControllerProtocol(Protocol): @abstractmethod def get_booted_ota_status(self) -> wrapper.StatusOta: - """Get the ota_status loaded from status file during otaclient starts up. + """Get the ota_status loaded from status file during otaclient starts + up. + + This value is meant to be used only once during otaclient starts + up, to init the live_ota_status maintained by otaclient. - This value is meant to be used only once during otaclient starts up, - to init the live_ota_status maintained by otaclient. """ @abstractmethod @@ -40,12 +42,13 @@ def get_standby_boot_dir(self) -> Path: """Get the Path points to the standby slot's boot folder. NOTE(20230907): this will always return the path to - /boot. - DEPRECATED(20230907): standby slot creator doesn't need to - treat the files under /boot specially, it is - boot controller's responsibility to get the - kernel/initrd.img from standby slot and prepare - them to actual boot dir. + /boot. DEPRECATED(20230907): standby + slot creator doesn't need to treat the files + under /boot specially, it is boot controller's + responsibility to get the kernel/initrd.img from + standby slot and prepare them to actual boot + dir. + """ @abstractmethod diff --git a/otaclient/app/boot_control/selecter.py b/otaclient/app/boot_control/selecter.py index 6a771bae7..ddede3d5a 100644 --- a/otaclient/app/boot_control/selecter.py +++ b/otaclient/app/boot_control/selecter.py @@ -39,6 +39,7 @@ def detect_bootloader() -> BootloaderType: NOTE: this function is only used when ecu_info.yaml doesn't contains the bootloader information. + """ # evidence: assuming that all x86 device is using grub machine, arch = platform.machine(), platform.processor() diff --git a/otaclient/app/common.py b/otaclient/app/common.py index 611eeaa37..4d8eafa4c 100644 --- a/otaclient/app/common.py +++ b/otaclient/app/common.py @@ -126,6 +126,7 @@ def subprocess_run_wrapper( Returns: subprocess.CompletedProcess[bytes]: the result of the execution. + """ if isinstance(cmd, str): cmd = shlex.split(cmd) @@ -157,6 +158,7 @@ def subprocess_check_output( Returns: str: UTF-8 decoded stripped stdout. + """ try: res = subprocess_run_wrapper( @@ -187,6 +189,7 @@ def subprocess_call( cmd (str | list[str]): command to be executed. raise_exception (bool, optional): raise the underlying CalledProcessError. Defaults to False. timeout (Optional[float], optional): timeout for execution. Defaults to None. + """ try: subprocess_run_wrapper(cmd, check=True, check_output=False, timeout=timeout) @@ -232,6 +235,7 @@ def copytree_identical(src: Path, dst: Path): NOTE: is_file/is_dir also returns True if it is a symlink and the link target is_file/is_dir + """ if src.is_symlink() or not src.is_dir(): raise ValueError(f"{src} is not a dir") @@ -323,6 +327,7 @@ def re_symlink_atomic(src: Path, target: Union[Path, str]): NOTE: os.rename is atomic when src and dst are on the same filesystem under linux. NOTE 2: src should not exist or exist as file/symlink. + """ if not (src.is_symlink() and str(os.readlink(src)) == str(target)): tmp_link = Path(src).parent / f"tmp_link_{os.urandom(6).hex()}" @@ -338,6 +343,7 @@ def replace_atomic(src: Union[str, Path], dst: Union[str, Path]): """Atomically replace dst file with src file. NOTE: atomic is ensured by os.rename/os.replace under the same filesystem. + """ src, dst = Path(src), Path(dst) if not src.is_file(): @@ -409,9 +415,10 @@ def __init__( self._done_que: Queue[DoneTask] = Queue() def _done_task_cb(self, item: T, fut: Future): - """ - Tracking done counting, set all_done event. + """Tracking done counting, set all_done event. + add failed to failed list. + """ self._se.release() # always release se first # NOTE: don't change dispatched_tasks if shutdown_event is set @@ -431,8 +438,8 @@ def _done_task_cb(self, item: T, fut: Future): self._done_que.put_nowait(DoneTask(fut, item)) def _task_dispatcher(self, func: Callable[[T], Any], _iter: Iterable[T]): - """A dispatcher in a dedicated thread that dispatches - tasks to threadpool.""" + """A dispatcher in a dedicated thread that dispatches tasks to + threadpool.""" for item in _iter: if self._shutdown_event.is_set(): return @@ -575,6 +582,7 @@ def ensure_otaproxy_start( Raises: A ConnectionError if exceeds . + """ start_time = int(time.time()) next_warning = start_time + warning_interval @@ -606,11 +614,12 @@ def ensure_otaproxy_start( class PersistFilesHandler: """Preserving files in persist list from to . - Files being copied will have mode bits preserved, - and uid/gid preserved with mapping as follow: + Files being copied will have mode bits preserved, and uid/gid + preserved with mapping as follow: + + src_uid -> src_name -> dst_name -> dst_uid src_gid -> src_name -> + dst_name -> dst_gid - src_uid -> src_name -> dst_name -> dst_uid - src_gid -> src_name -> dst_name -> dst_gid """ def __init__( diff --git a/otaclient/app/configs.py b/otaclient/app/configs.py index fbd072d38..ecb43314e 100644 --- a/otaclient/app/configs.py +++ b/otaclient/app/configs.py @@ -47,6 +47,7 @@ class _InternalSettings: WARNING: typically the common settings SHOULD NOT be changed! otherwise the backward compatibility will be impact. Change the fields in BaseConfig if you want to tune the otaclient. + """ # ------ common paths ------ # diff --git a/otaclient/app/create_standby/common.py b/otaclient/app/create_standby/common.py index e5ffcaa3e..ecceab0a3 100644 --- a/otaclient/app/create_standby/common.py +++ b/otaclient/app/create_standby/common.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -r"""Common used helpers, classes and functions for different bank creating methods.""" +r"""Common used helpers, classes and functions for different bank creating +methods.""" import logging @@ -98,6 +99,7 @@ def get_tracker( Returns: A hardlink tracker and a bool to indicates whether the caller is the writer or not. + """ with self._lock: _ref = self._hash_ref_dict.get(_identifier) @@ -266,6 +268,7 @@ def _prepare_local_copy_from_active_slot( verification process faster. NOTE: verify the file before copying to the standby slot! + """ # if we have information related to this file in advance(with saved # OTA image meta for the active slot), use this information to diff --git a/otaclient/app/create_standby/interface.py b/otaclient/app/create_standby/interface.py index 41f5e5709..9217df6cb 100644 --- a/otaclient/app/create_standby/interface.py +++ b/otaclient/app/create_standby/interface.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -r"""create_standby package provide the feature of applying update to standby slot. +r"""create_standby package provide the feature of applying update to standby +slot. This package has two main jobs: 1. calculate and prepare delta against target image and local running image @@ -27,6 +28,7 @@ don't present locally), 4. upper caller(otaclient) calls the create_standby_slot method to apply update to the standby slot. + """ @@ -62,5 +64,5 @@ def calculate_and_prepare_delta(self) -> DeltaBundle: ... @classmethod @abstractmethod def should_erase_standby_slot(cls) -> bool: - """Tell whether standby slot should be erased - under this standby slot creating mode.""" + """Tell whether standby slot should be erased under this standby slot + creating mode.""" diff --git a/otaclient/app/create_standby/rebuild_mode.py b/otaclient/app/create_standby/rebuild_mode.py index ae7d26912..004018bef 100644 --- a/otaclient/app/create_standby/rebuild_mode.py +++ b/otaclient/app/create_standby/rebuild_mode.py @@ -179,6 +179,7 @@ def create_standby_slot(self): """Apply changes to the standby slot. This method should be called before calculate_and_prepare_delta. + """ self._process_dirs() self._process_regulars() diff --git a/otaclient/app/downloader.py b/otaclient/app/downloader.py index 41cdaf1fd..0f6cabbe6 100644 --- a/otaclient/app/downloader.py +++ b/otaclient/app/downloader.py @@ -79,6 +79,7 @@ class UnhandledHTTPError(DownloadError): """HTTPErrors that cannot be handled by us. Currently include 403 and 404. + """ @@ -103,8 +104,10 @@ class DownloadFailedSpaceNotEnough(DownloadError): class UnhandledRequestException(DownloadError): - """requests exception that we didn't cover. + """Requests exception that we didn't cover. + If we get this exc, we should consider handle it. + """ @@ -112,7 +115,8 @@ class UnhandledRequestException(DownloadError): def _transfer_invalid_retrier(retries: int, backoff_factor: float, backoff_max: int): - """Retry mechanism that covers interruption/validation failed of data transfering. + """Retry mechanism that covers interruption/validation failed of data + transfering. Retry mechanism applied on requests only retries during making connection to remote server, this retry method retries on the transfer interruption and/or recevied data invalid. @@ -123,6 +127,7 @@ def _transfer_invalid_retrier(retries: int, backoff_factor: float, backoff_max: NOTE: retry on errors during/after data transfering, which are ChunkStreamingError and HashVerificationError. NOTE: also cover the unhandled requests errors. + """ def _decorator(func: Callable[P, T]) -> Callable[P, T]: @@ -181,8 +186,9 @@ class DecompressionAdapterProtocol(Protocol): def iter_chunk(self, src_stream: Union[IO[bytes], ByteString]) -> Iterator[bytes]: """Decompresses the source stream. - This Method take a src_stream of compressed file and - return another stream that yields decompressed data chunks. + This Method take a src_stream of compressed file and return + another stream that yields decompressed data chunks. + """ @@ -379,7 +385,8 @@ def _prepare_header( """Inject ota-file-cache-control header if digest is available. Currently this method preserves the input_header, while - updating/injecting Ota-File-Cache-Control header. + updating/injecting Ota-File-Cache-Control header. + """ # NOTE: only inject ota-file-cache-control-header if we have upper otaproxy, # or we have information to inject @@ -406,13 +413,15 @@ def _check_against_cache_policy_in_resp( compression_alg: Optional[str], resp_headers: CIDict, ) -> Tuple[Optional[str], Optional[str]]: - """Checking digest and compression_alg against cache_policy from resp headers. + """Checking digest and compression_alg against cache_policy from resp + headers. If upper responds with file_sha256 and file_compression_alg by ota-file-cache-control header, use these information, otherwise use the information provided by client. Returns: A tuple of file_sha256 and file_compression_alg for the requested resources. + """ if not (cache_policy_str := resp_headers.get(CACHE_CONTROL_HEADER, None)): return digest, compression_alg @@ -440,8 +449,9 @@ def _check_against_cache_policy_in_resp( def _prepare_url(self, url: str, proxies: Optional[Dict[str, str]] = None) -> str: """Force changing URL scheme to HTTP if required. - When upper otaproxy is set and use_http_if_http_proxy_set is True, - Force changing the URL scheme to HTTP. + When upper otaproxy is set and use_http_if_http_proxy_set is + True, Force changing the URL scheme to HTTP. + """ if self.use_http_if_http_proxy_set and proxies and "http" in proxies: return urlsplit(url)._replace(scheme="http").geturl() @@ -549,8 +559,8 @@ def download_retry_inf( inactive_timeout: int = cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT, **kwargs, ) -> Tuple[int, int, int]: - """A wrapper that dispatch download task to threadpool, and keep retrying until - downloading succeeds or exceeded inactive_timeout.""" + """A wrapper that dispatch download task to threadpool, and keep + retrying until downloading succeeds or exceeded inactive_timeout.""" retry_count = 0 while not self.shutdowned.is_set(): try: diff --git a/otaclient/app/errors.py b/otaclient/app/errors.py index 83408009f..899b74247 100644 --- a/otaclient/app/errors.py +++ b/otaclient/app/errors.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""OTA error code definition""" +"""OTA error code definition.""" import traceback @@ -64,8 +64,10 @@ def to_errcode_str(self) -> str: class OTAError(Exception): """Errors that happen during otaclient code executing. - This exception class should be the base module level exception for each module. - It should always be captured by the OTAError at otaclient.py. + This exception class should be the base module level exception for + each module. It should always be captured by the OTAError at + otaclient.py. + """ ERROR_PREFIX: ClassVar[str] = "E" @@ -116,7 +118,7 @@ def get_error_report(self, title: str = "") -> str: class NetworkError(OTAError): - """Generic network error""" + """Generic network error.""" failure_type: wrapper.FailureType = wrapper.FailureType.RECOVERABLE failure_errcode: OTAErrorCode = OTAErrorCode.E_NETWORK diff --git a/otaclient/app/log_setting.py b/otaclient/app/log_setting.py index cdc82ab3e..12acae1f9 100644 --- a/otaclient/app/log_setting.py +++ b/otaclient/app/log_setting.py @@ -30,7 +30,8 @@ class _LogTeeHandler(logging.Handler): - """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" + """Implementation of teeing local logs to a remote otaclient-iot-logger + server.""" def __init__(self, max_backlog: int = 2048) -> None: super().__init__() diff --git a/otaclient/app/ota_client.py b/otaclient/app/ota_client.py index 0c3879f1d..656ddeb2d 100644 --- a/otaclient/app/ota_client.py +++ b/otaclient/app/ota_client.py @@ -57,12 +57,10 @@ class OTAClientControlFlags: - """ - When self ECU's otaproxy is enabled, all the child ECUs of this ECU - and self ECU OTA update will depend on its otaproxy, we need to - control when otaclient can start its downloading/reboot with considering - whether local otaproxy is started/required. - """ + """When self ECU's otaproxy is enabled, all the child ECUs of this ECU and + self ECU OTA update will depend on its otaproxy, we need to control when + otaclient can start its downloading/reboot with considering whether local + otaproxy is started/required.""" def __init__(self) -> None: self._can_reboot = threading.Event() @@ -129,7 +127,8 @@ def __init__( # helper methods def _download_files(self, download_list: Iterator[wrapper.RegularInf]): - """Download all needed OTA image files indicated by calculated bundle.""" + """Download all needed OTA image files indicated by calculated + bundle.""" def _download_file(entry: wrapper.RegularInf) -> RegInfProcessedStats: """Download single OTA image file.""" @@ -341,6 +340,7 @@ def _execute_update( "CloudFront-Signature": "o4ojzMrJwtSIg~izsy...", "CloudFront-Key-Pair-Id": "K2...", } + """ logger.info(f"execute local update: {version=},{raw_url_base=}") logger.debug(f"{cookies_json=}") @@ -491,6 +491,7 @@ def execute( """Main entry for executing local OTA update. Handles OTA failure and logging/finalizing on failure. + """ try: self._execute_update(version, raw_url_base, cookies_json) diff --git a/otaclient/app/ota_client_stub.py b/otaclient/app/ota_client_stub.py index 68ff0217c..c1c20b637 100644 --- a/otaclient/app/ota_client_stub.py +++ b/otaclient/app/ota_client_stub.py @@ -71,7 +71,9 @@ def __init__( def extra_kwargs(self) -> Dict[str, Any]: """Inject kwargs to otaproxy startup entry. - Currently only inject if external cache storage is used. + Currently only inject if external cache storage + is used. + """ _res = {} if self.external_cache_enabled and self._external_cache_activated: @@ -242,6 +244,7 @@ async def stop(self): NOTE: This method only shutdown the otaproxy process, it will not cleanup the cache dir. cache dir cleanup is handled by other mechanism. Check cleanup_cache_dir API for more details. + """ if not self.enabled or self._lock.locked() or not self.is_running: return @@ -302,6 +305,7 @@ class ECUStatusStorage: If ECU has been disconnected(doesn't respond to status probing) longer than , it will be treated as UNREACHABLE and listed in , further being excluded when generating any_requires_network, all_success, in_update_ecus_id, failed_ecus_id, success_ecus_id and in_update_childecus_id. + """ DELAY_OVERALL_STATUS_REPORT_UPDATE = ( @@ -390,6 +394,7 @@ async def _generate_overall_status_report(self): """Generate overall status report against tracked active OTA ECUs. NOTE: as special case, lost_ecus set is calculated against all reachable ECUs. + """ self.properties_last_update_timestamp = cur_timestamp = int(time.time()) @@ -499,6 +504,7 @@ async def _loop_updating_properties(self): 2. if just receive update request, skip generating new overall status report for seconds to prevent pre-mature status change. check on_receive_update_request method below for more details. + """ last_storage_update = self.storage_last_updated_timestamp while not self._debug_properties_update_shutdown_event.is_set(): @@ -520,7 +526,8 @@ async def _loop_updating_properties(self): # API async def update_from_child_ecu(self, status_resp: wrapper.StatusResponse): - """Update the ECU status storage with child ECU's status report(StatusResponse).""" + """Update the ECU status storage with child ECU's status + report(StatusResponse).""" async with self._writer_lock: self.storage_last_updated_timestamp = cur_timestamp = int(time.time()) @@ -546,7 +553,8 @@ async def update_from_child_ecu(self, status_resp: wrapper.StatusResponse): self._all_ecus_status_v2.pop(ecu_id, None) async def update_from_local_ecu(self, ecu_status: wrapper.StatusResponseEcuV2): - """Update ECU status storage with local ECU's status report(StatusResponseEcuV2).""" + """Update ECU status storage with local ECU's status + report(StatusResponseEcuV2).""" async with self._writer_lock: self.storage_last_updated_timestamp = cur_timestamp = int(time.time()) @@ -555,7 +563,8 @@ async def update_from_local_ecu(self, ecu_status: wrapper.StatusResponseEcuV2): self._all_ecus_last_contact_timestamp[ecu_id] = cur_timestamp async def on_ecus_accept_update_request(self, ecus_accept_update: Set[str]): - """Update overall ECU status report directly on ECU(s) accept OTA update request. + """Update overall ECU status report directly on ECU(s) accept OTA + update request. for the ECUs that accepts OTA update request, we: 1. add these ECUs' id into in_update_ecus_id set @@ -567,6 +576,7 @@ async def on_ecus_accept_update_request(self, ecus_accept_update: Set[str]): To prevent pre-mature overall status change(for example, the child ECU doesn't change their ota_status to UPDATING on-time due to status polling interval mismatch), the above set value will be kept for seconds. + """ async with self._properties_update_lock: self._tracked_active_ecus = _OrderedSet(ecus_accept_update) @@ -590,6 +600,7 @@ def get_polling_interval(self) -> int: NOTE: use get_polling_waiter if want to wait, only call this method if one only wants to get the polling interval value. + """ return ( self.ACTIVE_POLLING_INTERVAL @@ -601,11 +612,12 @@ def get_polling_waiter(self): """Get a executable async waiter which waiting time is decided by whether there is active OTA updating or not. - This waiter will wait for and then return - if there is active OTA in the cluster. - Or if the cluster doesn't have active OTA, wait - or self.active_ota_update_present is set, return when one of the - condition is met. + This waiter will wait for and then + return if there is active OTA in the cluster. Or if the + cluster doesn't have active OTA, wait + or self.active_ota_update_present is set, return when one of the + condition is met. + """ async def _waiter(): @@ -624,7 +636,8 @@ async def _waiter(): return _waiter async def export(self) -> wrapper.StatusResponse: - """Export the contents of this storage to an instance of StatusResponse. + """Export the contents of this storage to an instance of + StatusResponse. NOTE: wrapper.StatusResponse's add_ecu method already takes care of v1 format backward-compatibility(input v2 format will result in @@ -632,6 +645,7 @@ async def export(self) -> wrapper.StatusResponse: NOTE: to align with preivous behavior that disconnected ECU should have no entry in status API response, simulate this behavior by skipping disconnected ECU's status report entry. + """ res = wrapper.StatusResponse() @@ -712,6 +726,7 @@ class OTAClientServiceStub: """Handlers for otaclient service API. This class also handles otaproxy lifecyle and dependence managing. + """ OTAPROXY_SHUTDOWN_DELAY = cfg.OTAPROXY_MINIMUM_SHUTDOWN_INTERVAL @@ -767,6 +782,7 @@ async def _otaproxy_lifecycle_managing(self): NOTE: cache_dir cleanup is handled here, when all ECUs are in SUCCESS ota_status, cache_dir will be removed. + """ otaproxy_last_launched_timestamp = 0 while not self._debug_status_checking_shutdown_event.is_set(): @@ -798,6 +814,7 @@ async def _otaclient_control_flags_managing(self): Prevent self ECU from rebooting when their is at least one ECU under UPDATING ota_status. + """ while not self._debug_status_checking_shutdown_event.is_set(): _can_reboot = self._otaclient_control_flags.is_can_reboot_flag_set() diff --git a/otaclient/app/ota_metadata.py b/otaclient/app/ota_metadata.py index c3d6fa487..c2c17b45d 100644 --- a/otaclient/app/ota_metadata.py +++ b/otaclient/app/ota_metadata.py @@ -85,8 +85,10 @@ class MetadataJWTVerificationFailed(Exception): class MetafilesV1(str, Enum): - """version1 text based ota metafiles fname. + """Version1 text based ota metafiles fname. + Check _MetadataJWTClaimsLayout for more details. + """ # entry_name: regular @@ -109,8 +111,9 @@ class MetaFieldDescriptor(Generic[FV]): """Field descriptor for dataclass _MetadataJWTClaimsLayout. This descriptor takes one dict from parsed metadata.jwt, parses - and assigns it to each field in _MetadataJWTClaimsLayout. - Check __set__ method's docstring for more details. + and assigns it to each field in _MetadataJWTClaimsLayout. Check + __set__ method's docstring for more details. + """ HASH_KEY = "hash" # version1 @@ -211,6 +214,7 @@ class _MetadataJWTParser: - NOTE: If there is no root or intermediate certificate, certification verification will be skipped! This SHOULD ONLY happen at non-production environment! + """ HASH_ALG = "sha256" @@ -242,6 +246,7 @@ def _verify_metadata_cert(self, metadata_cert: bytes) -> None: Raises: Raise MetadataJWTVerificationFailed on verification failed. + """ ca_set_prefix = set() # e.g. under _certs_dir: A.1.pem, A.2.pem, B.1.pem, B.2.pem @@ -291,6 +296,7 @@ def _verify_metadata(self, metadata_cert: bytes): Raises: Raise MetadataJWTVerificationFailed on validation failed. + """ try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, metadata_cert) @@ -311,6 +317,7 @@ def get_otametadata(self) -> "_MetadataJWTClaimsLayout": Returns: A instance of OTAMetaData representing the parsed(but not yet verified) metadata. + """ return self.ota_metadata @@ -319,6 +326,7 @@ def verify_metadata(self, metadata_cert: bytes): Raises: Raise MetadataJWTVerificationFailed on verification failed. + """ # step1: verify the cert itself against local pinned CA cert self._verify_metadata_cert(metadata_cert) @@ -377,6 +385,7 @@ def check_metadata_version(cls, claims: List[Dict[str, Any]]): Warnings will be issued for the following 2 cases: 1. field is missing, 2. doesn't match + """ version_field_found = False for entry in claims: @@ -404,8 +413,8 @@ def parse_payload(cls, payload: Union[str, bytes], /) -> Self: return res def assign_fields(self, claims: List[Dict[str, Any]]): - """Assign each fields in _MetadataJWTClaimsLayout with input claims - in parsed metadata.jwt.""" + """Assign each fields in _MetadataJWTClaimsLayout with input claims in + parsed metadata.jwt.""" for field in fields(self): # NOTE: default value for each field in dataclass is the descriptor # NOTE: skip non-metafield field diff --git a/otaclient/app/proto/__init__.py b/otaclient/app/proto/__init__.py index 82b5169a5..8ec1375d7 100644 --- a/otaclient/app/proto/__init__.py +++ b/otaclient/app/proto/__init__.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - """Packed compiled protobuf files for otaclient.""" import importlib.util import sys @@ -50,6 +48,7 @@ def _import_proto(*module_fpaths: Path): NOTE: compiled protobuf files under proto folder will be imported as modules to the global namespace. + """ for _fpath in module_fpaths: _module_name, _module = _import_from_file(_fpath) # noqa: F821 diff --git a/otaclient/app/proto/_common.py b/otaclient/app/proto/_common.py index 38dd48db7..bf07cc56c 100644 --- a/otaclient/app/proto/_common.py +++ b/otaclient/app/proto/_common.py @@ -65,8 +65,8 @@ def _reveal_origin_type(tp: Type[_T]) -> Type[_T]: - """Return the actual type from generic alias, - or return as it if input type is not generic alias.""" + """Return the actual type from generic alias, or return as it if input type + is not generic alias.""" if _origin := get_origin(tp): return _origin elif isinstance(tp, type): @@ -77,9 +77,10 @@ def _reveal_origin_type(tp: Type[_T]) -> Type[_T]: def calculate_slots(_proto_msg_type: Type[_pb_Message]) -> List[str]: """Calculate the __slots__ for input proto message type. - Since we are using field descriptors in wrapper creating, attribute values - are not stored in the actual field name. This function creates the slots - with the actual attribute name for each field. + Since we are using field descriptors in wrapper creating, attribute + values are not stored in the actual field name. This function + creates the slots with the actual attribute name for each field. + """ _field_names = list(_proto_msg_type.DESCRIPTOR.fields_by_name) return [_get_field_attrn(_fn) for _fn in _field_names] @@ -94,12 +95,12 @@ class WrapperBase(Generic[_T], ABC): @classmethod @abstractmethod def convert(cls, _in: _T, /, **kwargs): - """Convert""" + """Convert.""" @abstractmethod def export_pb(self) -> _T: - """Export itself to protobuf types, or containers that hold - protobuf types.""" + """Export itself to protobuf types, or containers that hold protobuf + types.""" @abstractmethod def export_to_pb(self, pb_msg: _pb_Message, field_name: str): @@ -341,9 +342,10 @@ def _create_field_descriptor(field_annotation: Any) -> Optional[_FieldBase]: class _FieldBase(Generic[_T], ABC): """Base for message field descriptor. - _T stands for the scalar value type for scalar value field, - or converter type for non-scalar value field - (message, enum, repeated field, etc). + _T stands for the scalar value type for scalar value field, or + converter type for non-scalar value field (message, enum, + repeated field, etc). + """ @abstractmethod @@ -371,9 +373,8 @@ def __set_name__(self, owner: type, name: str): self._attrn = _get_field_attrn(name) def export_to_pb(self, obj, pb_msg: _pb_Message): - """Stub method for calling the underlaying wrapper types - export_to_pb method. - """ + """Stub method for calling the underlaying wrapper types export_to_pb + method.""" _wrapper: WrapperBase = getattr(obj, self._attrn) _wrapper.export_to_pb(pb_msg, self.field_name) @@ -415,9 +416,11 @@ def __set__(self, obj, value: Any) -> None: class _EnumField(_FieldBase[EnumWrapperType]): """For field that contains one enum value. - Basically we can handle enum like handling a normal message instance, - but parsing from/exporting to protobuf enum requires special treatment, - so separate _EnumField descriptor is defined for enum field. + Basically we can handle enum like handling a normal message + instance, but parsing from/exporting to protobuf enum requires + special treatment, so separate _EnumField descriptor is defined + for enum field. + """ def __init__(self, field_annotation: Any) -> None: @@ -580,10 +583,12 @@ class MessageWrapper(WrapperBase[MessageType]): # internal def __init_subclass__(cls) -> None: - """Special treatment for every user defined protobuf message wrapper types. + """Special treatment for every user defined protobuf message wrapper + types. - Parse type annotations defined in wrapper class. - bypass the user defined __init__. + """ if not (_orig_bases := getattr(cls, "__orig_bases__")) or len(_orig_bases) < 1: raise TypeError("MessageWrapper should have type arg") @@ -727,8 +732,8 @@ def converted_from_deserialized(cls, _bytes: bytes, /) -> Self: class _DefaultValueEnumMeta(EnumMeta): - """Align the protobuf enum behavior that the default value is - the first enum in defined order(typically 0 at runtime).""" + """Align the protobuf enum behavior that the default value is the first + enum in defined order(typically 0 at runtime).""" def __call__(cls, *args, **kwargs): if not args and not kwargs: @@ -767,6 +772,7 @@ class Duration(MessageWrapper[_Duration]): """Wrapper for protobuf well-known type Duration. NOTE: this wrapper supports directly adding nanoseconds. + """ __slots__ = calculate_slots(_Duration) diff --git a/otaclient/app/proto/_ota_metafiles_wrapper.py b/otaclient/app/proto/_ota_metafiles_wrapper.py index bca6a24f8..2129001f7 100644 --- a/otaclient/app/proto/_ota_metafiles_wrapper.py +++ b/otaclient/app/proto/_ota_metafiles_wrapper.py @@ -100,7 +100,8 @@ def get_hash(self) -> str: return self.sha256hash.hex() def relatively_join(self, mount_point: Union[Path, str]) -> str: - """Return a path string relative to / or /boot and joined to .""" + """Return a path string relative to / or /boot and joined to + .""" if self.path.startswith("/boot"): return os.path.join(mount_point, os.path.relpath(self.path, "/boot")) else: @@ -109,7 +110,8 @@ def relatively_join(self, mount_point: Union[Path, str]) -> str: def copy_relative_to_mount_point( self, dst_mount_point: Union[Path, str], /, *, src_mount_point: Union[Path, str] ): - """Copy file to the path that relative to dst_mount_point, from src_mount_point.""" + """Copy file to the path that relative to dst_mount_point, from + src_mount_point.""" _src = self.relatively_join(src_mount_point) _dst = self.relatively_join(dst_mount_point) shutil.copy2(_src, _dst, follow_symlinks=False) diff --git a/otaclient/app/proto/_otaclient_v2_pb2_wrapper.py b/otaclient/app/proto/_otaclient_v2_pb2_wrapper.py index a4de6c2fa..71affdc7c 100644 --- a/otaclient/app/proto/_otaclient_v2_pb2_wrapper.py +++ b/otaclient/app/proto/_otaclient_v2_pb2_wrapper.py @@ -91,7 +91,8 @@ def iter_ecu_v2(self) -> _Iterable[ECUType]: class ECUStatusSummary(_Protocol): - """Common status summary protocol for StatusResponseEcu and StatusResponseEcuV2.""" + """Common status summary protocol for StatusResponseEcu and + StatusResponseEcuV2.""" @property @abstractmethod @@ -111,7 +112,8 @@ def is_success(self) -> bool: @property @abstractmethod def requires_network(self) -> bool: - """If this ECU is in UPDATING and requires network connection for OTA.""" + """If this ECU is in UPDATING and requires network connection for + OTA.""" # enum diff --git a/otaclient/app/proto/otaclient_v2_pb2_grpc.py b/otaclient/app/proto/otaclient_v2_pb2_grpc.py index 9d15fd0f9..be4f2a240 100644 --- a/otaclient/app/proto/otaclient_v2_pb2_grpc.py +++ b/otaclient/app/proto/otaclient_v2_pb2_grpc.py @@ -6,7 +6,9 @@ class OtaClientServiceStub(object): """The OTA Client service definition. + Style Guide: https://developers.google.com/protocol-buffers/docs/style#message_and_field_names + """ def __init__(self, channel): @@ -14,6 +16,7 @@ def __init__(self, channel): Args: channel: A grpc.Channel. + """ self.Update = channel.unary_unary( "/OtaClientV2.OtaClientService/Update", @@ -34,12 +37,14 @@ def __init__(self, channel): class OtaClientServiceServicer(object): """The OTA Client service definition. + Style Guide: https://developers.google.com/protocol-buffers/docs/style#message_and_field_names + """ def Update(self, request, context): - """ - `Update` service requests OTA client to start updating. + """`Update` service requests OTA client to start updating. + The OTA client of each ECU retrieves the request that matches its own ECU id and starts it. Requests to each ECU included in the `UpdateRequest` are handled by that respective ECU and returns the response to the parent ECU. @@ -47,24 +52,24 @@ def Update(self, request, context): After requesting `Update` and if the OTA status is `UPDATING`, the request is successful. Note that if the child ECU doesn't respond, the grandchild response is not included by `UpdateResponse`. + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def Rollback(self, request, context): - """ - NOT YET - """ + """NOT YET.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def Status(self, request, context): - """ - `Status` service requests OTA client to retrieve OTA client status. + """`Status` service requests OTA client to retrieve OTA client status. + Note that if the child ECU doesn't respond, the grandchild response is not contained by `StatusResponse`. + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") @@ -98,7 +103,9 @@ def add_OtaClientServiceServicer_to_server(servicer, server): # This class is part of an EXPERIMENTAL API. class OtaClientService(object): """The OTA Client service definition. + Style Guide: https://developers.google.com/protocol-buffers/docs/style#message_and_field_names + """ @staticmethod diff --git a/otaclient/app/proto/streamer.py b/otaclient/app/proto/streamer.py index b35943bec..8ec4d141c 100644 --- a/otaclient/app/proto/streamer.py +++ b/otaclient/app/proto/streamer.py @@ -11,13 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""uint32 len delimited streamer. +"""Uint32 len delimited streamer. The length-delimited stream layout is as follow: ---------------------------------------------------- | len(msg_1) | msg_1 | len(msg_2) | msg_2 | ... ----------------------------------------------------- Which will be 4 bytes unsigned int in big-endian layout. + """ diff --git a/otaclient/app/update_stats.py b/otaclient/app/update_stats.py index c01ee6cda..d87874df9 100644 --- a/otaclient/app/update_stats.py +++ b/otaclient/app/update_stats.py @@ -68,6 +68,7 @@ def _staging_changes(self) -> Generator[UpdateStatus, None, None]: """Acquire a staging storage for updating the slot atomically. NOTE: it should be only one collecter that calling this method! + """ staging_slot = self.store.get_snapshot() try: @@ -122,6 +123,7 @@ def report_apply_delta(self, stats_list: List[RegInfProcessedStats]): unconditionally pop one stat from the stats_list because the preparation of first copy is already recorded (either by picking up local copy(keep_delta) or downloading) + """ for _stat in stats_list[1:]: self._que.put_nowait(_stat) diff --git a/otaclient/configs/__init__.py b/otaclient/configs/__init__.py index 55ed90a28..f4afe50c5 100644 --- a/otaclient/configs/__init__.py +++ b/otaclient/configs/__init__.py @@ -11,4 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""otaclient configs package.""" +"""Otaclient configs package.""" diff --git a/otaclient/configs/ecu_info.py b/otaclient/configs/ecu_info.py index 54a28f95e..f072518ee 100644 --- a/otaclient/configs/ecu_info.py +++ b/otaclient/configs/ecu_info.py @@ -41,6 +41,7 @@ class BootloaderType(str, Enum): cboot: ADLink rqx-580, rqx-58g, with BSP 32.5.x. (theoretically other Nvidia jetson xavier devices using cboot are also supported) rpi_boot: raspberry pi 4 with eeprom version newer than 2020-10-28(with tryboot support). + """ AUTO_DETECT = "auto_detect" @@ -80,6 +81,7 @@ class ECUInfo(BaseFixedConfig): bootloader: the bootloader type of this ECU. available_ecu_ids: a list of ECU IDs that should be involved in OTA campaign. secondaries: a list of ECUContact objects for sub ECUs. + """ format_version: int = 1 diff --git a/otaclient/configs/proxy_info.py b/otaclient/configs/proxy_info.py index 3ed4d4c3e..05eb15c49 100644 --- a/otaclient/configs/proxy_info.py +++ b/otaclient/configs/proxy_info.py @@ -47,6 +47,7 @@ class ProxyInfo(BaseFixedConfig): upper_ota_proxy: the URL of upper OTA proxy used by local ota_proxy server or otaclient(proxy chain). logging_server: the URL of AWS IoT otaclient logs upload server. + """ format_version: int = 1 @@ -94,9 +95,11 @@ def get_proxy_for_local_ota(self) -> str | None: def gateway_otaproxy(self) -> bool: """Whether this local otaproxy is a gateway otaproxy. - Evidence is if no upper_ota_proxy, then this otaproxy should act as a gateway. - NOTE(20240202): this replaces the previous user-configurable gateway field in - the proxy_info.yaml. + Evidence is if no upper_ota_proxy, then this otaproxy should act + as a gateway. NOTE(20240202): this replaces the previous user- + configurable gateway field in the + proxy_info.yaml. + """ return not bool(self.upper_ota_proxy) diff --git a/otaclient/ota_proxy/__init__.py b/otaclient/ota_proxy/__init__.py index f0eb18cbd..a0a550f77 100644 --- a/otaclient/ota_proxy/__init__.py +++ b/otaclient/ota_proxy/__init__.py @@ -118,10 +118,11 @@ def subprocess_otaproxy_launcher( def _inner(*args: _P.args, **kwargs: _P.kwargs) -> SpawnProcess: """Helper method to launch otaproxy in subprocess. - This method works like a wrapper and passthrough all args and kwargs - to the _subprocess_main function, and then execute the function in - a subprocess. - check _subprocess_main function for more details. + This method works like a wrapper and passthrough all args and + kwargs to the _subprocess_main function, and then execute the + function in a subprocess. check _subprocess_main function for + more details. + """ # prepare otaproxy coro _otaproxy_entry = partial(otaproxy_entry, *args, **kwargs) diff --git a/otaclient/ota_proxy/cache_control.py b/otaclient/ota_proxy/cache_control.py index 2ec4ceeca..0611cf944 100644 --- a/otaclient/ota_proxy/cache_control.py +++ b/otaclient/ota_proxy/cache_control.py @@ -60,6 +60,7 @@ class OTAFileCacheControl(_HeaderDef): and retry caching when presented, file_sha256: the hash value of the original requested OTA file file_compression_alg: the compression alg used for the OTA file + """ @classmethod @@ -105,6 +106,7 @@ def update_header_str(cls, _input: str, **kwargs) -> str: 2. retry_caching 3. file_sha256 4. file_compression_alg + """ _fields: Dict[str, type] = getattr(cls, _FIELDS) _parsed_directives = {} diff --git a/otaclient/ota_proxy/db.py b/otaclient/ota_proxy/db.py index 3858bee82..0f5275fc2 100644 --- a/otaclient/ota_proxy/db.py +++ b/otaclient/ota_proxy/db.py @@ -33,7 +33,7 @@ class CacheMeta(ORMBase): - """revision 4 + """Revision 4. url: unquoted URL from the request of this cache entry. bucket_id: the LRU bucket this cache entry in. @@ -45,6 +45,7 @@ class CacheMeta(ORMBase): file_compression_alg: the compression used for the cached OTA file entry, if no compression is applied, then empty. content_encoding: the content_encoding header string comes with resp from remote server. + """ file_sha256: ColumnDescriptor[str] = ColumnDescriptor( @@ -68,7 +69,9 @@ class CacheMeta(ORMBase): def export_headers_to_client(self) -> Dict[str, str]: """Export required headers for client. - Currently includes content-type, content-encoding and ota-file-cache-control headers. + Currently includes content-type, content-encoding and ota-file- + cache-control headers. + """ res = {} if self.content_encoding: @@ -121,9 +124,8 @@ def check_db_file(cls, db_file: Union[str, Path]) -> bool: @classmethod def init_db_file(cls, db_file: Union[str, Path]): - """ - Purge the old db file and init the db files, creating table in it. - """ + """Purge the old db file and init the db files, creating table in + it.""" # remove the db file first Path(db_file).unlink(missing_ok=True) try: @@ -155,6 +157,7 @@ def __init__(self, db_file: Union[str, Path]): Raises: ValueError on invalid ota_cache db file, sqlite3.Error if optimization settings applied failed. + """ self._con = sqlite3.connect( db_file, @@ -194,6 +197,7 @@ def remove_entries(self, fd: ColumnDescriptor[FV], *_inputs: FV) -> int: Returns: returns affected rows count. + """ if not _inputs: return 0 @@ -224,6 +228,7 @@ def lookup_entry( Returns: An instance of CacheMeta representing the cache entry, or None if lookup failed. + """ if not CacheMeta.contains_field(fd): return @@ -253,6 +258,7 @@ def insert_entry(self, *cache_meta: CacheMeta) -> int: Returns: Returns inserted rows count. + """ if not cache_meta: return 0 @@ -267,6 +273,7 @@ def lookup_all(self) -> List[CacheMeta]: Returns: A list of CacheMeta instances representing each entry. + """ with self._con as con: cur = con.execute(f"SELECT * FROM {self.TABLE_NAME}", ()) @@ -282,6 +289,7 @@ def rotate_cache(self, bucket_idx: int, num: int) -> Optional[List[str]]: Return: A list of OTA file's hashes that needed to be deleted for space reserving, or None if no enough entries for space reserving. + """ bucket_fn, last_access_fn = ( CacheMeta.bucket_idx.name, @@ -328,12 +336,13 @@ def rotate_cache(self, bucket_idx: int, num: int) -> Optional[List[str]]: class _ProxyBase: - """A proxy class base for OTACacheDB that dispatches all requests into a threadpool.""" + """A proxy class base for OTACacheDB that dispatches all requests into a + threadpool.""" DB_THREAD_POOL_SIZE = 1 def _thread_initializer(self, db_f): - """Init a db connection for each thread worker""" + """Init a db connection for each thread worker.""" # NOTE: set init to False always as we only operate db when using proxy self._thread_local.db = OTACacheDB(db_f) diff --git a/otaclient/ota_proxy/orm.py b/otaclient/ota_proxy/orm.py index 023594693..ff142c095 100644 --- a/otaclient/ota_proxy/orm.py +++ b/otaclient/ota_proxy/orm.py @@ -48,13 +48,14 @@ def __new__(cls, *args, **kwargs) -> None: class ColumnDescriptor(Generic[FV]): - """ColumnDescriptor represents a column in a sqlite3 table, - implemented the python descriptor protocol. + """ColumnDescriptor represents a column in a sqlite3 table, implemented the + python descriptor protocol. + + When accessed as attribute of TableCls(subclass of ORMBase) + instance, it will return the value of the column/field. When + accessed as attribute of the TableCls class, it will return the + ColumnDescriptor itself. - When accessed as attribute of TableCls(subclass of ORMBase) instance, - it will return the value of the column/field. - When accessed as attribute of the TableCls class, - it will return the ColumnDescriptor itself. """ def __init__( @@ -162,6 +163,7 @@ class ORMBase(metaclass=ORMeta): """Base class for defining a sqlite3 table programatically. Subclass of this base class is also a subclass of dataclass. + """ @classmethod @@ -180,13 +182,15 @@ def row_to_meta(cls, row: "Union[sqlite3.Row, Dict[str, Any], Tuple[Any]]") -> S @classmethod def get_create_table_stmt(cls, table_name: str) -> str: - """Generate the sqlite query statement to create the defined table in database. + """Generate the sqlite query statement to create the defined table in + database. Args: table_name: the name of table to be created Returns: query statement to create the table defined by this class. + """ _col_descriptors: List[ColumnDescriptor] = [ getattr(cls, field.name) for field in fields(cls) @@ -213,7 +217,7 @@ def get_shape(cls) -> str: return ",".join(["?"] * len(fields(cls))) def __hash__(self) -> int: - """compute the hash with all stored fields' value.""" + """Compute the hash with all stored fields' value.""" return hash(astuple(self)) def __eq__(self, __o: object) -> bool: diff --git a/otaclient/ota_proxy/ota_cache.py b/otaclient/ota_proxy/ota_cache.py index ffd738613..45b1242f3 100644 --- a/otaclient/ota_proxy/ota_cache.py +++ b/otaclient/ota_proxy/ota_cache.py @@ -68,6 +68,7 @@ def create_cachemeta_for_request( cache_identifier: pre-collected information from caller compression_alg: pre-collected information from caller resp_headers_from_upper + """ cache_meta = CacheMeta( url=raw_url, @@ -112,6 +113,7 @@ class CacheTracker(Generic[_WEAKREF]): writer_finished: a property indicates whether the provider finished the caching. writer_failed: a property indicates whether provider fails to finish the caching. + """ READER_SUBSCRIBE_WAIT_PROVIDER_TIMEOUT = 2 @@ -125,6 +127,7 @@ def _tmp_file_naming(cls, cache_identifier: str) -> str: NOTE: append 4bytes hex to identify cache entry for the same OTA file between different trackers. + """ return ( f"{cfg.TMP_FILE_PREFIX}{cls.FNAME_PART_SEPARATOR}" @@ -203,8 +206,9 @@ def get_cache_write_gen(self) -> Optional[AsyncGenerator[int, bytes]]: async def _provider_write_cache(self) -> AsyncGenerator[int, bytes]: """Provider writes data chunks from upper caller to tmp cache file. - If cache writing failed, this method will exit and tracker.writer_failed and - tracker.writer_finished will be set. + If cache writing failed, this method will exit and + tracker.writer_failed and tracker.writer_finished will be set. + """ logger.debug(f"start to cache for {self.meta=}...") try: @@ -252,6 +256,7 @@ async def _subscribe_cache_streaming(self) -> AsyncIterator[bytes]: Raises: CacheMultipleStreamingFailed if provider failed or timeout reading data chunk from tmp cache file(might be caused by a dead provider). + """ try: err_count, _bytes_read = 0, 0 @@ -288,6 +293,7 @@ async def _read_cache(self) -> AsyncIterator[bytes]: Raises: CacheMultipleStreamingFailed if fails to read from the cached file, this might indicate a partial written cache file. + """ _bytes_read, _retry_count = 0, 0 async with aiofiles.open(self.fpath, "rb", executor=self._executor) as f: @@ -325,6 +331,7 @@ async def provider_start(self, meta: CacheMeta): Args: meta: inst of CacheMeta for the requested file tracked by this tracker. This meta is created by open_remote() method. + """ self.meta = meta self._cache_write_gen = self._provider_write_cache() @@ -353,7 +360,8 @@ async def provider_on_failed(self): self._ref = None async def subscriber_subscribe_tracker(self) -> Optional[AsyncIterator[bytes]]: - """Reader subscribe this tracker and get a file descriptor to get data chunks.""" + """Reader subscribe this tracker and get a file descriptor to get data + chunks.""" _wait_count = 0 while not self._writer_ready.is_set(): _wait_count += 1 @@ -384,11 +392,13 @@ class _Weakref: class CachingRegister: """A tracker register that manages cache trackers. - For each ongoing caching for unique OTA file, there will be only one unique identifier for it. + For each ongoing caching for unique OTA file, there will be only one + unique identifier for it. + + This first caller that requests with the identifier will become the + provider and create a new tracker for this identifier. The later + comes callers will become the subscriber to this tracker. - This first caller that requests with the identifier will become the provider and create - a new tracker for this identifier. - The later comes callers will become the subscriber to this tracker. """ def __init__(self, base_dir: Union[str, Path]): @@ -411,6 +421,7 @@ async def get_tracker( Returns: An inst of tracker, and a bool indicates the caller is provider(True), or subscriber(False). + """ _new_ref = _Weakref() _ref = self._id_ref_dict.setdefault(cache_identifier, _new_ref) @@ -439,13 +450,15 @@ async def get_tracker( class LRUCacheHelper: - """A helper class that provides API for accessing/managing cache entries in ota cachedb. + """A helper class that provides API for accessing/managing cache entries in + ota cachedb. Serveral buckets are created according to predefined file size threshould. Each bucket will maintain the cache entries of that bucket's size definition, LRU is applied on per-bucket scale. NOTE: currently entry in first bucket and last bucket will skip LRU rotate. + """ BSIZE_LIST = list(cfg.BUCKET_FILE_SIZE_DICT.keys()) @@ -485,6 +498,7 @@ async def rotate_cache(self, size: int) -> Optional[List[str]]: Returns: A list of hashes that needed to be cleaned, or empty list if rotation is not required, or None if cache rotation cannot be executed. + """ # NOTE: currently item size smaller than 1st bucket and larger than latest bucket # will be saved without cache rotating. @@ -510,7 +524,8 @@ async def cache_streaming( meta: CacheMeta, tracker: CacheTracker, ) -> AsyncIterator[bytes]: - """A cache streamer that get data chunk from and tees to multiple destination. + """A cache streamer that get data chunk from and tees to multiple + destination. Data chunk yielded from will be teed to: 1. upper uvicorn otaproxy APP to send back to client, @@ -526,6 +541,7 @@ async def cache_streaming( Raises: CacheStreamingFailed if any exception happens during retrieving. + """ async def _inner(): @@ -575,6 +591,7 @@ class OTACache: init_cache: whether to clear the existed cache, default is True. base_dir: the location to store cached files. db_file: the location to store database file. + """ def __init__( @@ -676,6 +693,7 @@ async def close(self): NOTE: cache folder cleanup on successful ota update is not performed by the OTACache. + """ logger.debug("shutdown ota-cache...") if self._cache_enabled and not self._closed: @@ -700,6 +718,7 @@ def _background_check_free_space(self): Raises: Raises FileNotFoundError if the cache folder somehow disappears during checking. + """ while not self._closed: try: @@ -756,6 +775,7 @@ async def _reserve_space(self, size: int) -> bool: Returns: A bool indicates whether the space reserving is successful or not. + """ _hashes = await self._lru_helper.rotate_cache(size) # NOTE: distinguish between [] and None! The fore one means we don't need @@ -780,6 +800,7 @@ async def _commit_cache_callback(self, meta: CacheMeta): Args: meta: inst of CacheMeta that represents a cached file. + """ try: if not self._storage_below_soft_limit_event.is_set(): @@ -805,6 +826,7 @@ def _process_raw_url(self, raw_url: str) -> str: NOTE(20221003): as otaproxy, we should treat all contents after netloc as path and not touch it, because we should forward the request as it to the remote. NOTE(20221003): unconditionally set scheme to https if enable_https, else unconditionally set to http + """ _raw_parse = urlsplit(raw_url) # get the base of the raw_url, which is :// @@ -946,6 +968,7 @@ async def retrieve_file( Returns: A tuple contains an asyncio generator for upper server app to yield data chunks from and headers dict that should be sent back to client in resp. + """ if self._closed: raise BaseOTACacheError("ota cache pool is closed") diff --git a/otaclient/ota_proxy/server_app.py b/otaclient/ota_proxy/server_app.py index 4ff5a7119..8ce01573f 100644 --- a/otaclient/ota_proxy/server_app.py +++ b/otaclient/ota_proxy/server_app.py @@ -60,6 +60,7 @@ def parse_raw_headers(raw_headers: List[Tuple[bytes, bytes]]) -> Dict[str, str]: Returns: An inst of header dict. + """ headers = {} for bheader_pair in raw_headers: @@ -83,8 +84,9 @@ def encode_headers(headers: Mapping[str, str]) -> List[Tuple[bytes, bytes]]: """Encode headers dict to list of bytes tuples for sending back to client. Uvicorn requests application to pre-process headers to bytes. - Currently we only need to send content-encoding and ota-file-cache-control header - back to client. + Currently we only need to send content-encoding and ota-file-cache- + control header back to client. + """ bytes_headers: List[Tuple[bytes, bytes]] = [] for name, value in headers.items(): @@ -126,6 +128,7 @@ class App: # NOTE: lifespan must be set to "on" for properly launching/closing ota_cache instance uvicorn.run(app, host="0.0.0.0", port=8082, log_level="debug", lifespan="on") + """ def __init__(self, ota_cache: OTACache): @@ -169,6 +172,7 @@ async def _send_chunk(data: bytes, more: bool, send): data: bytes to send to client. more bool: whether there will be a next chunk or not. send: ASGI send method. + """ if more: await send({"type": RESP_TYPE_BODY, "body": data, "more_body": True}) @@ -179,12 +183,14 @@ async def _send_chunk(data: bytes, more: bool, send): async def _init_response( status: Union[HTTPStatus, int], headers: List[Tuple[bytes, bytes]], send ): - """Helper method for constructing and sending HTTP response back to client. + """Helper method for constructing and sending HTTP response back to + client. Args: status: HTTP status code for this response. headers dict: headers to be sent in this response. send: ASGI send method. + """ await send( { @@ -258,7 +264,7 @@ async def _error_handling_during_transferring(self, url: str, send): await self._send_chunk(b"", False, send) async def _pull_data_and_send(self, url: str, scope, send): - """Streaming data between OTACache instance and ota_client + """Streaming data between OTACache instance and ota_client. Retrieves file descriptor from OTACache instance, yields chunks from file descriptor and streams chunks back to ota_client. @@ -267,6 +273,7 @@ async def _pull_data_and_send(self, url: str, scope, send): url str: URL requested by ota_client. scope: ASGI scope for current request. send: ASGI send method. + """ headers_from_client = parse_raw_headers(scope["headers"]) @@ -319,9 +326,10 @@ async def http_handler(self, scope, send): async def __call__(self, scope, receive, send): """The entrance of the ASGI application. - This method directly handles the income requests. - It filters requests, hands valid requests over to the app entry, - and handles lifespan protocol to start/stop server properly. + This method directly handles the income requests. It filters + requests, hands valid requests over to the app entry, and + handles lifespan protocol to start/stop server properly. + """ if scope["type"] == REQ_TYPE_LIFESPAN: # handling lifespan protocol diff --git a/pyproject.toml b/pyproject.toml index b8bc8ceaa..cd09995f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,9 @@ [build-system] build-backend = "setuptools.build_meta" -requires = ["setuptools>=61", "setuptools_scm[toml]>=6.4"] +requires = [ + "setuptools>=61", + "setuptools_scm[toml]>=6.4", +] [project] name = "otaclient" @@ -17,7 +20,11 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] -dynamic = ["dependencies", "optional-dependencies", "version"] +dynamic = [ + "dependencies", + "optional-dependencies", + "version", +] [project.urls] Source = "https://github.com/tier4/ota-client" diff --git a/tests/conftest.py b/tests/conftest.py index b856edd93..9f27d4365 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -129,6 +129,7 @@ def ab_slots(tmp_path_factory: pytest.TempPathFactory) -> SlotMeta: Return: A tuple includes the path to A/B slots respectly. + """ # prepare slot_a slot_a = tmp_path_factory.mktemp("slot_a") diff --git a/tests/test_boot_control/test_rpi_boot.py b/tests/test_boot_control/test_rpi_boot.py index 59164681d..6b21f284c 100644 --- a/tests/test_boot_control/test_rpi_boot.py +++ b/tests/test_boot_control/test_rpi_boot.py @@ -81,9 +81,8 @@ class _RebootEXP(BaseException): class TestRPIBootControl: - """ - Simulating otaclient starts from slot_a, and apply ota_update to slot_b - """ + """Simulating otaclient starts from slot_a, and apply ota_update to + slot_b.""" @pytest.fixture def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): diff --git a/tests/test_ota_client_stub.py b/tests/test_ota_client_stub.py index b4eea874c..60197914b 100644 --- a/tests/test_ota_client_stub.py +++ b/tests/test_ota_client_stub.py @@ -683,7 +683,8 @@ async def test_on_receive_update_request( assert self.ecu_storage.active_ota_update_present.is_set() async def test_polling_waiter_switching_from_idling_to_active(self): - """Waiter should immediately return if active_ota_update_present is set.""" + """Waiter should immediately return if active_ota_update_present is + set.""" _sleep_time, _mocked_interval = 3, 60 self.ecu_storage.IDLE_POLLING_INTERVAL = _mocked_interval # type: ignore @@ -784,10 +785,8 @@ async def setup_test( await asyncio.sleep(self.ENSURE_NEXT_CHECKING_ROUND) # ensure shutdown async def test__otaproxy_lifecycle_managing(self): - """ - otaproxy startup/shutdown is only controlled by any_requires_network - in overall ECU status report. - """ + """Otaproxy startup/shutdown is only controlled by any_requires_network + in overall ECU status report.""" # ------ otaproxy startup ------- # # --- prepartion --- # self.otaproxy_launcher.is_running = False diff --git a/tests/test_ota_proxy/test_cachedb.py b/tests/test_ota_proxy/test_cachedb.py index a5966d3a7..09d0813a1 100644 --- a/tests/test_ota_proxy/test_cachedb.py +++ b/tests/test_ota_proxy/test_cachedb.py @@ -239,9 +239,8 @@ def test_lookup_all(self): assert entries_set == set(checked_entries) def test_lookup(self): - """ - lookup the one entry in the database, and ensure the timestamp is updated - """ + """Lookup the one entry in the database, and ensure the timestamp is + updated.""" target = self.entries[-1] # lookup once to update last_acess checked_entry = self.conn.lookup_entry(CacheMeta.url, target.url) @@ -250,9 +249,7 @@ def test_lookup(self): assert checked_entry and checked_entry.last_access > target.last_access def test_delete(self): - """ - delete the whole 8MiB bucket - """ + """Delete the whole 8MiB bucket.""" bucket_size = 8 * (1024**2) assert ( self.conn.remove_entries(CacheMeta.bucket_idx, bucket_size) diff --git a/tests/test_ota_proxy/test_ota_cache.py b/tests/test_ota_proxy/test_ota_cache.py index 783826bae..8539008ff 100644 --- a/tests/test_ota_proxy/test_ota_cache.py +++ b/tests/test_ota_proxy/test_ota_cache.py @@ -99,10 +99,8 @@ async def test_rotate_cache(self): class TestOngoingCachingRegister: - """ - NOTE; currently this test only testing the weakref implementation part, - the file descriptor management part is tested in test_ota_proxy_server - """ + """NOTE; currently this test only testing the weakref implementation part, + the file descriptor management part is tested in test_ota_proxy_server.""" URL = "common_url" WORKS_NUM = 128 @@ -129,10 +127,8 @@ async def _worker( self, idx: int, ) -> Tuple[bool, Optional[CacheMeta]]: - """ - Returns tuple of bool indicates whether the worker is writter, and CacheMeta - from tracker. - """ + """Returns tuple of bool indicates whether the worker is writter, and + CacheMeta from tracker.""" # simulate multiple works subscribing the register await self.sync_event.wait() await asyncio.sleep(random.randrange(100, 200) // 100) diff --git a/tests/test_persist_file_handling.py b/tests/test_persist_file_handling.py index cc00a0c0d..6cd74b5b6 100644 --- a/tests/test_persist_file_handling.py +++ b/tests/test_persist_file_handling.py @@ -33,7 +33,6 @@ def create_files(tmp_path: Path): src = tmp_path / "src" src.mkdir() - """ src/ src/a diff --git a/tests/test_proto/test_otaclient_pb2_wrapper.py b/tests/test_proto/test_otaclient_pb2_wrapper.py index 87c63fd0f..07700f1f0 100644 --- a/tests/test_proto/test_otaclient_pb2_wrapper.py +++ b/tests/test_proto/test_otaclient_pb2_wrapper.py @@ -153,20 +153,20 @@ def test_convert_message( class Test_enum_wrapper_cooperate: def test_direct_compare(self): - """protobuf enum and wrapper enum can compare directly.""" + """Protobuf enum and wrapper enum can compare directly.""" _protobuf_enum = v2.UPDATING _wrapped = wrapper.StatusOta.UPDATING assert _protobuf_enum == _wrapped def test_assign_to_protobuf_message(self): - """wrapper enum can be directly assigned in protobuf message.""" + """Wrapper enum can be directly assigned in protobuf message.""" l, r = v2.StatusProgress(phase=v2.REGULAR), v2.StatusProgress( phase=wrapper.StatusProgressPhase.REGULAR.value, ) compare_message(l, r) def test_used_in_message_wrapper(self): - """wrapper enum can be exported.""" + """Wrapper enum can be exported.""" l, r = ( v2.StatusProgress(phase=v2.REGULAR), wrapper.StatusProgress( @@ -176,7 +176,7 @@ def test_used_in_message_wrapper(self): compare_message(l, r) def test_converted_from_protobuf_enum(self): - """wrapper enum can be converted from and to protobuf enum.""" + """Wrapper enum can be converted from and to protobuf enum.""" _protobuf_enum = v2.REGULAR _converted = wrapper.StatusProgressPhase(_protobuf_enum) assert _protobuf_enum == _converted diff --git a/tests/test_update_stats.py b/tests/test_update_stats.py index 6e978a633..19721d625 100644 --- a/tests/test_update_stats.py +++ b/tests/test_update_stats.py @@ -45,12 +45,11 @@ def update_stats_collector(self): _collector.stop() def workload(self, idx: int): - """ - idx mod 3 == 0 is DOWNLOAD, - idx mod 3 == 1 is COPY, - idx mod 3 == 2 is APPLY_UPDATE, + """Idx mod 3 == 0 is DOWNLOAD, idx mod 3 == 1 is COPY, idx mod 3 == 2 + is APPLY_UPDATE, For each op, elapsed_ns is 1, size is 2, download_bytes is 1 + """ _remainder = idx % 3 _report = RegInfProcessedStats(elapsed_ns=1, size=2) diff --git a/tools/offline_ota_image_builder/__main__.py b/tools/offline_ota_image_builder/__main__.py index 8a5ab15ba..8481e3745 100644 --- a/tools/offline_ota_image_builder/__main__.py +++ b/tools/offline_ota_image_builder/__main__.py @@ -34,6 +34,7 @@ Check README.md for the offline OTA image layout specification. Please refer to OTA cache design doc for more details. + """ diff --git a/tools/offline_ota_image_builder/builder.py b/tools/offline_ota_image_builder/builder.py index bbf31e522..89cfb4c64 100644 --- a/tools/offline_ota_image_builder/builder.py +++ b/tools/offline_ota_image_builder/builder.py @@ -61,7 +61,8 @@ def _unarchive_image(image_fpath: StrPath, *, workdir: StrPath): def _process_ota_image(ota_image_dir: StrPath, *, data_dir: StrPath, meta_dir: StrPath): - """Processing OTA image under and update and .""" + """Processing OTA image under and update and + .""" _start_time = time.time() data_dir = Path(data_dir) # statistics diff --git a/tools/offline_ota_image_builder/manifest.py b/tools/offline_ota_image_builder/manifest.py index 26e33a63d..71ddc3678 100644 --- a/tools/offline_ota_image_builder/manifest.py +++ b/tools/offline_ota_image_builder/manifest.py @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""External cache source image manifest definition +"""External cache source image manifest definition. A JSON file manifest.json that contains the basic information of built external cache source image will be placed at the image rootfs. Check README.md for the spec. + """ diff --git a/tools/status_monitor/ecu_status_box.py b/tools/status_monitor/ecu_status_box.py index 61cc38c1d..d445b1b5f 100644 --- a/tools/status_monitor/ecu_status_box.py +++ b/tools/status_monitor/ecu_status_box.py @@ -65,8 +65,9 @@ def update_ecu_status( ): """Update internal contents storage with input . - This method is called by tracker module to update the contents within - the status display box. + This method is called by tracker module to update the contents + within the status display box. + """ self.index = index # the order of ECU might be changed in tracker if ecu_status == self._last_status: @@ -202,8 +203,9 @@ def render_status_box_pad( def failure_info_subwin_handler(self, stdscr: curses.window): """The handler for failure info subwin. - The current window will be replaced by this subwin, and the controller - will be replaced by this window handler. + The current window will be replaced by this subwin, and the + controller will be replaced by this window handler. + """ ScreenHandler( stdscr, @@ -214,8 +216,9 @@ def failure_info_subwin_handler(self, stdscr: curses.window): def raw_ecu_status_subwin_handler(self, stdscr: curses.window): """The handler for raw_ecu_status subwin. - The current window will be replaced by this subwin, and the controller - will be replaced by this window handler. + The current window will be replaced by this subwin, and the + controller will be replaced by this window handler. + """ ScreenHandler( stdscr, diff --git a/tools/status_monitor/utils.py b/tools/status_monitor/utils.py index 1b53f32df..24bb8d7a7 100644 --- a/tools/status_monitor/utils.py +++ b/tools/status_monitor/utils.py @@ -166,7 +166,7 @@ def page_scroll_key_handler( contents_area_max_y: int, ) -> Tuple[int, int]: """Logic for handling cursor position on pad when screen is UP/DOWN - direction scrolling, move cursor to new position. + direction scrolling, move cursor to new position. Params: hlines: the display area's count of lines @@ -174,6 +174,7 @@ def page_scroll_key_handler( Returns: A tuple of new cursor's position in (y, x). + """ new_cursor_y, new_cursor_x = last_cursor_y, last_cursor_x @@ -252,6 +253,7 @@ def draw_pad(pad: curses.window, win_h: int, win_w: int, contents: Sequence[str] """Draw contents onto a pad within a window with size of :. NOTE: after drawing, render_pad function should be called to reflect the newly updated pad. + """ pad.clear()