From de7d39c67082f5d07a9d245cceed73ae7346b65d Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:14:57 +0900 Subject: [PATCH 1/6] refactor: boot_control: remove and not use the deprecated get_standby_boot_dir API (#461) This PR removes the deprecated boot_control API get_standby_boot_dir from boot_control implementations, as this API always returns /boot since them. Since 20230907, each boot_control implementation will not expect files under /boot dir to be directly downloaded on separated boot partition anymore. Instead, all the files, including files under /boot folder, will be saved directly to standby slot, and then the boot_control implementation takes the responsibility to copy the required files at /boot folder to separate boot partitions if any at post-update stage. Note that this PR doesn't change create_standby implementation, which still expects boot_dir param on init, but the behavior will not change as the boot_dir param will still be provided with /boot. --- src/otaclient/boot_control/_grub.py | 5 +---- src/otaclient/boot_control/_jetson_cboot.py | 3 --- src/otaclient/boot_control/_jetson_uefi.py | 3 --- src/otaclient/boot_control/_rpi_boot.py | 3 --- src/otaclient/boot_control/_slot_mnt_helper.py | 12 +----------- src/otaclient/boot_control/protocol.py | 18 ------------------ src/otaclient/ota_core.py | 7 ++++++- tests/test_otaclient/test_create_standby.py | 1 - 8 files changed, 8 insertions(+), 44 deletions(-) diff --git a/src/otaclient/boot_control/_grub.py b/src/otaclient/boot_control/_grub.py index 996764917..6b79f4ca8 100644 --- a/src/otaclient/boot_control/_grub.py +++ b/src/otaclient/boot_control/_grub.py @@ -848,7 +848,7 @@ def _cleanup_standby_ota_partition_folder(self): def _copy_boot_files_from_standby_slot(self): """Copy boot files under /boot to standby ota-partition folder.""" standby_ota_partition_dir = self._ota_status_control.standby_ota_status_dir - for f in self._mp_control.standby_boot_dir.iterdir(): + for f in (self._mp_control.standby_slot_mount_point / "boot").iterdir(): if f.is_file() and not f.is_symlink(): shutil.copy(f, standby_ota_partition_dir) @@ -857,9 +857,6 @@ def _copy_boot_files_from_standby_slot(self): def get_standby_slot_path(self) -> Path: # pragma: no cover return self._mp_control.standby_slot_mount_point - def get_standby_boot_dir(self) -> Path: # pragma: no cover - return self._mp_control.standby_boot_dir - def load_version(self) -> str: # pragma: no cover return self._ota_status_control.load_active_slot_version() diff --git a/src/otaclient/boot_control/_jetson_cboot.py b/src/otaclient/boot_control/_jetson_cboot.py index 0c0bcd5d4..ac240aaf7 100644 --- a/src/otaclient/boot_control/_jetson_cboot.py +++ b/src/otaclient/boot_control/_jetson_cboot.py @@ -584,9 +584,6 @@ def _firmware_update(self) -> bool | None: def get_standby_slot_path(self) -> Path: # pragma: no cover return self._mp_control.standby_slot_mount_point - def get_standby_boot_dir(self) -> Path: # pragma: no cover - return self._mp_control.standby_boot_dir - def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool): try: logger.info("jetson-cboot: pre-update ...") diff --git a/src/otaclient/boot_control/_jetson_uefi.py b/src/otaclient/boot_control/_jetson_uefi.py index dd9a6523b..606df4910 100644 --- a/src/otaclient/boot_control/_jetson_uefi.py +++ b/src/otaclient/boot_control/_jetson_uefi.py @@ -962,9 +962,6 @@ def _firmware_update(self) -> bool: def get_standby_slot_path(self) -> Path: # pragma: no cover return self._mp_control.standby_slot_mount_point - def get_standby_boot_dir(self) -> Path: # pragma: no cover - return self._mp_control.standby_boot_dir - def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool): try: logger.info("jetson-uefi: pre-update ...") diff --git a/src/otaclient/boot_control/_rpi_boot.py b/src/otaclient/boot_control/_rpi_boot.py index ace80c7b0..69cff1aef 100644 --- a/src/otaclient/boot_control/_rpi_boot.py +++ b/src/otaclient/boot_control/_rpi_boot.py @@ -479,9 +479,6 @@ def _write_standby_fstab(self): def get_standby_slot_path(self) -> Path: # pragma: no cover return self._mp_control.standby_slot_mount_point - def get_standby_boot_dir(self) -> Path: # pragma: no cover - return self._mp_control.standby_boot_dir - def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool): try: logger.info("rpi_boot: pre-update setup...") diff --git a/src/otaclient/boot_control/_slot_mnt_helper.py b/src/otaclient/boot_control/_slot_mnt_helper.py index 868850b57..ec95998f8 100644 --- a/src/otaclient/boot_control/_slot_mnt_helper.py +++ b/src/otaclient/boot_control/_slot_mnt_helper.py @@ -23,7 +23,7 @@ from pathlib import Path from otaclient.configs.cfg import cfg -from otaclient_common import cmdhelper, replace_root +from otaclient_common import cmdhelper from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) @@ -46,16 +46,6 @@ def __init__( self.standby_slot_mount_point = Path(standby_slot_mount_point) self.active_slot_mount_point = Path(active_slot_mount_point) - # standby slot /boot dir - # NOTE(20230907): this will always be /boot, - # in the future this attribute will not be used by - # standby slot creater. - self.standby_boot_dir = Path( - replace_root( - cfg.BOOT_DPATH, cfg.CANONICAL_ROOT, self.standby_slot_mount_point - ) - ) - # ensure the each mount points being umounted at termination atexit.register( partial( diff --git a/src/otaclient/boot_control/protocol.py b/src/otaclient/boot_control/protocol.py index 5fda7d60b..27b6fe75d 100644 --- a/src/otaclient/boot_control/protocol.py +++ b/src/otaclient/boot_control/protocol.py @@ -19,8 +19,6 @@ from pathlib import Path from typing import Protocol -from typing_extensions import deprecated - from otaclient._types import OTAStatus @@ -39,22 +37,6 @@ def get_booted_ota_status(self) -> OTAStatus: def get_standby_slot_path(self) -> Path: """Get the Path points to the standby slot mount point.""" - @deprecated( - "standby slot creator doesn't need to treat the files under /boot specially" - ) - @abstractmethod - def get_standby_boot_dir(self) -> Path: - """Get the Path points to the standby slot's boot folder. - - NOTE(20230907): this will always return the path to - /boot. - DEPRECATED(20230907): standby slot creator doesn't need to - treat the files under /boot specially, it is - boot controller's responsibility to get the - kernel/initrd.img from standby slot and prepare - them to actual boot dir. - """ - @abstractmethod def load_version(self) -> str: """Read the version info from the current slot.""" diff --git a/src/otaclient/ota_core.py b/src/otaclient/ota_core.py index c36759cad..78cb25f9b 100644 --- a/src/otaclient/ota_core.py +++ b/src/otaclient/ota_core.py @@ -489,9 +489,14 @@ def _execute_update(self): self._ota_tmp_image_meta_dir_on_standby.mkdir(exist_ok=True) # ------ in-update ------ # + # NOTE(20230907): standby slot creator doesn't need to + # treat the files under /boot specially, it is + # boot controller's responsibility to get the + # kernel/initrd.img from standby slot and prepare + # them to actual boot dir. standby_slot_creator = self._create_standby_cls( ota_metadata=otameta, - boot_dir=str(self._boot_controller.get_standby_boot_dir()), + boot_dir=str(Path(cfg.STANDBY_SLOT_MNT) / "boot"), active_slot_mount_point=cfg.ACTIVE_SLOT_MNT, standby_slot_mount_point=cfg.STANDBY_SLOT_MNT, status_report_queue=self._status_report_queue, diff --git a/tests/test_otaclient/test_create_standby.py b/tests/test_otaclient/test_create_standby.py index cb2abf3bf..444449114 100644 --- a/tests/test_otaclient/test_create_standby.py +++ b/tests/test_otaclient/test_create_standby.py @@ -88,7 +88,6 @@ def mock_setup(self, mocker: MockerFixture, prepare_ab_slots): self._boot_control = typing.cast( BootControllerProtocol, mocker.MagicMock(spec=BootControllerProtocol) ) - self._boot_control.get_standby_boot_dir.return_value = self.slot_b_boot_dir # type: ignore # ------ mock otaclient cfg ------ # mocker.patch(f"{OTA_CORE_MODULE}.cfg.STANDBY_SLOT_MNT", str(self.slot_b)) From 4f830178ed210886c54770f46082e2006599ca27 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:58:37 +0900 Subject: [PATCH 2/6] chore: logging the firmware_version of current active_slot and proxy_info.yaml on startup (#463) --- src/otaclient/main.py | 1 + src/otaclient/ota_core.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/otaclient/main.py b/src/otaclient/main.py index cecb4391c..2b9c15306 100644 --- a/src/otaclient/main.py +++ b/src/otaclient/main.py @@ -91,6 +91,7 @@ def main() -> None: # pragma: no cover logger.info("started") logger.info(f"otaclient version: {__version__}") logger.info(f"ecu_info.yaml: \n{ecu_info}") + logger.info(f"proxy_info.yaml: \n{proxy_info}") check_other_otaclient(cfg.OTACLIENT_PID_FILE) create_otaclient_rundir(cfg.RUN_DIR) diff --git a/src/otaclient/ota_core.py b/src/otaclient/ota_core.py index 78cb25f9b..203f719c1 100644 --- a/src/otaclient/ota_core.py +++ b/src/otaclient/ota_core.py @@ -703,6 +703,7 @@ def __init__( self.started = True logger.info("otaclient started") + logger.info(f"firmware_version: {self.current_version}") def _on_failure( self, From a3da76321f19c1fd9cfb94974c44037e41bcabe2 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:47:02 +0900 Subject: [PATCH 3/6] feat: otaclient.configs.cfg: exposes ECU_INFO_LOADED_SUCCESSFULLY and PROXY_INFO_LOADED_SUCCESSFULLY to indicate whether default ecu_info/proxy_info are used (#464) This PR introduces the changes to indicate whether default ecu_info/proxy_info is used due to ecu_info.yaml file/proxy_info.yaml file is broken. This is for later implementing the feature of rejecting any OTA update request when ecu_info.yaml is broken. --- src/otaclient/configs/_ecu_info.py | 18 ++++++++++++---- src/otaclient/configs/_proxy_info.py | 18 ++++++++++++---- src/otaclient/configs/cfg.py | 21 +++++++++++++++++-- tests/conftest.py | 6 ++++-- .../test_configs/test_ecu_info.py | 2 +- .../test_configs/test_proxy_info.py | 2 +- 6 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/otaclient/configs/_ecu_info.py b/src/otaclient/configs/_ecu_info.py index c1ea4ad10..454a698b7 100644 --- a/src/otaclient/configs/_ecu_info.py +++ b/src/otaclient/configs/_ecu_info.py @@ -115,25 +115,35 @@ def get_available_ecu_ids(self) -> list[str]: # NOTE: this is backward compatibility for old x1 that doesn't have # ecu_info.yaml installed. +# NOTE(20241220): this is also the minimum workable ecu_info.yaml that +# exposes otaclient OTA API at local. This default settings should +# allow a single ECU setup works without problem. DEFAULT_ECU_INFO = ECUInfo( format_version=1, # current version is 1 ecu_id="autoware", # should be unique for each ECU in vehicle ) -def parse_ecu_info(ecu_info_file: StrOrPath) -> ECUInfo: +def parse_ecu_info(ecu_info_file: StrOrPath) -> tuple[bool, ECUInfo]: + """Parse the ecu_info.yaml file located at . + + Returns: + tuple[bool, ECUInfo]: bool indicates whether the provided ecu_info.yaml file + is loaded properly, if False, it means loading ecu_info.yaml file failed and + the default ecu_info is used. + """ try: _raw_yaml_str = Path(ecu_info_file).read_text() except FileNotFoundError as e: logger.warning(f"{ecu_info_file=} not found: {e!r}") logger.warning(f"use default ecu_info: {DEFAULT_ECU_INFO}") - return DEFAULT_ECU_INFO + return False, DEFAULT_ECU_INFO try: loaded_ecu_info = yaml.safe_load(_raw_yaml_str) assert isinstance(loaded_ecu_info, dict), "not a valid yaml file" - return ECUInfo.model_validate(loaded_ecu_info, strict=True) + return True, ECUInfo.model_validate(loaded_ecu_info, strict=True) except Exception as e: logger.warning(f"{ecu_info_file=} is invalid: {e!r}\n{_raw_yaml_str=}") logger.warning(f"use default ecu_info: {DEFAULT_ECU_INFO}") - return DEFAULT_ECU_INFO + return False, DEFAULT_ECU_INFO diff --git a/src/otaclient/configs/_proxy_info.py b/src/otaclient/configs/_proxy_info.py index 4716ce09a..9b140c13f 100644 --- a/src/otaclient/configs/_proxy_info.py +++ b/src/otaclient/configs/_proxy_info.py @@ -123,26 +123,36 @@ def _deprecation_check(_in: dict[str, Any]) -> None: # NOTE: this default is for backward compatible with old device # that doesn't have proxy_info.yaml installed. +# NOTE(20241220): this default proxy_info should allow a single ECU +# environment works without problem. DEFAULT_PROXY_INFO = ProxyInfo( format_version=1, enable_local_ota_proxy=True, + upper_ota_proxy=None, ) -def parse_proxy_info(proxy_info_file: StrOrPath) -> ProxyInfo: +def parse_proxy_info(proxy_info_file: StrOrPath) -> tuple[bool, ProxyInfo]: + """Parse the proxy_info.yaml file located at . + + Returns: + tuple[bool, ProxyInfo]: bool indicates whether the provided proxy_info.yaml file + is loaded properly, if False, it means loading proxy_info.yaml file failed and + the default proxy_info is used. + """ try: _raw_yaml_str = Path(proxy_info_file).read_text() except FileNotFoundError as e: logger.warning(f"{proxy_info_file=} not found: {e!r}") logger.warning(f"use default proxy_info: {DEFAULT_PROXY_INFO}") - return DEFAULT_PROXY_INFO + return False, DEFAULT_PROXY_INFO try: loaded_proxy_info = yaml.safe_load(_raw_yaml_str) assert isinstance(loaded_proxy_info, dict), "not a valid yaml file" _deprecation_check(loaded_proxy_info) - return ProxyInfo.model_validate(loaded_proxy_info, strict=True) + return True, ProxyInfo.model_validate(loaded_proxy_info, strict=True) except Exception as e: logger.warning(f"{proxy_info_file=} is invalid: {e!r}\n{_raw_yaml_str=}") logger.warning(f"use default proxy_info: {DEFAULT_PROXY_INFO}") - return DEFAULT_PROXY_INFO + return False, DEFAULT_PROXY_INFO diff --git a/src/otaclient/configs/cfg.py b/src/otaclient/configs/cfg.py index 4071ce4b9..a3daff035 100644 --- a/src/otaclient/configs/cfg.py +++ b/src/otaclient/configs/cfg.py @@ -38,6 +38,19 @@ cfg_configurable = set_configs() cfg_consts = Consts() +ECU_INFO_LOADED_SUCCESSFULLY: bool +"""A const set at startup time ecu_info.yaml parsing. + +If it is False, it means that the ecu_info.yaml file is invalid, + and the default ecu_info(defined in _ecu_info module) is used. +""" +PROXY_INFO_LOADED_SUCCESSFULLY: bool +"""A const set at startup time proxy_info.yaml parsing. + +If it is False, it means that the proxy_info.yaml file is invalid, + and the default proxy_info(defined in _proxy_info module) is used. +""" + if TYPE_CHECKING: class _OTAClientConfigs(ConfigurableSettings, Consts): @@ -59,5 +72,9 @@ def __getattr__(self, name: str) -> Any: cfg = _OTAClientConfigs() -ecu_info = parse_ecu_info(ecu_info_file=cfg.ECU_INFO_FPATH) -proxy_info = parse_proxy_info(proxy_info_file=cfg.PROXY_INFO_FPATH) +ECU_INFO_LOADED_SUCCESSFULLY, ecu_info = parse_ecu_info( + ecu_info_file=cfg.ECU_INFO_FPATH +) +PROXY_INFO_LOADED_SUCCESSFULLY, proxy_info = parse_proxy_info( + proxy_info_file=cfg.PROXY_INFO_FPATH +) diff --git a/tests/conftest.py b/tests/conftest.py index 1df11b532..cd6f54f5a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -252,14 +252,16 @@ def setup_executor(self, mocker: pytest_mock.MockerFixture): def ecu_info_fixture(tmp_path: Path) -> ECUInfo: _yaml_f = tmp_path / "ecu_info.yaml" _yaml_f.write_text(ECU_INFO_YAML) - return parse_ecu_info(_yaml_f) + _, res = parse_ecu_info(_yaml_f) + return res @pytest.fixture def proxy_info_fixture(tmp_path: Path) -> ProxyInfo: _yaml_f = tmp_path / "proxy_info.yaml" _yaml_f.write_text(PROXY_INFO_YAML) - return parse_proxy_info(_yaml_f) + _, res = parse_proxy_info(_yaml_f) + return res MAX_TRACEBACK_SIZE = 2048 diff --git a/tests/test_otaclient/test_configs/test_ecu_info.py b/tests/test_otaclient/test_configs/test_ecu_info.py index 5c4b640f1..b85c31e0b 100644 --- a/tests/test_otaclient/test_configs/test_ecu_info.py +++ b/tests/test_otaclient/test_configs/test_ecu_info.py @@ -143,7 +143,7 @@ def test_ecu_info(tmp_path: Path, ecu_info_yaml: str, expected_ecu_info: ECUInfo (ecu_info_file := ota_dir / "ecu_info.yaml").write_text(ecu_info_yaml) # --- execution --- # - loaded_ecu_info = parse_ecu_info(ecu_info_file) + _, loaded_ecu_info = parse_ecu_info(ecu_info_file) # --- assertion --- # assert loaded_ecu_info == expected_ecu_info diff --git a/tests/test_otaclient/test_configs/test_proxy_info.py b/tests/test_otaclient/test_configs/test_proxy_info.py index 5f66e985b..e2b2d40f3 100644 --- a/tests/test_otaclient/test_configs/test_proxy_info.py +++ b/tests/test_otaclient/test_configs/test_proxy_info.py @@ -106,6 +106,6 @@ def test_proxy_info(tmp_path: Path, _input_yaml: str, _expected: ProxyInfo): proxy_info_file = tmp_path / "proxy_info.yml" proxy_info_file.write_text(_input_yaml) - _proxy_info = parse_proxy_info(str(proxy_info_file)) + _, _proxy_info = parse_proxy_info(str(proxy_info_file)) assert _proxy_info == _expected From 4fb90ad3b13ab3b3c54738c7d6656e57ec1356ea Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Mon, 23 Dec 2024 17:37:49 +0900 Subject: [PATCH 4/6] feat: ota_proxy: migrate to anyio, drop deps of aiofiles (#467) This PR introduces to use anyio as asyncio environment for ota_proxy, drops the use of aiofiles to use async file operations supports from anyio. Also this PR fixes the problem of ota_proxy launched by otaclient actually not using `uvloop` although we configure to use it. The main entry `run_otaproxy` is fixed and now the ota_proxy will be properly launched with uvloop. Other changes: 1. otaclient: integrate to use the new `run_otaproxy` entrypoint to launch ota_proxy. 2. fix up test files accordingly. DEPS: 1. add anyio and drop aiofiles. 2. bump to use simple-sqlite3-orm v0.7.0. --- pyproject.toml | 4 +- requirements.txt | 4 +- src/ota_proxy/__init__.py | 5 +- src/ota_proxy/__main__.py | 26 ++++------- src/ota_proxy/cache_streaming.py | 14 +++--- src/ota_proxy/ota_cache.py | 46 ++++++++++--------- src/ota_proxy/utils.py | 9 ++-- src/otaclient/_otaproxy_ctx.py | 23 ++++------ tests/test_ota_proxy/test_cache_streaming.py | 1 - .../test_subprocess_launch_otaproxy.py | 21 ++++----- 10 files changed, 70 insertions(+), 83 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a01d5fbd3..8a2ab5a74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dynamic = [ "version", ] dependencies = [ - "aiofiles<25,>=24.1", + "anyio>=4.5.1,<5", "aiohttp>=3.10.11,<3.12", "cryptography>=43.0.1,<45", "grpcio>=1.53.2,<1.69", @@ -35,7 +35,7 @@ dependencies = [ "pydantic-settings<3,>=2.3", "pyyaml<7,>=6.0.1", "requests<2.33,>=2.32", - "simple-sqlite3-orm<0.7,>=0.6", + "simple-sqlite3-orm<0.8,>=0.7", "typing-extensions>=4.6.3", "urllib3<2.3,>=2.2.2", "uvicorn[standard]>=0.30,<0.35", diff --git a/requirements.txt b/requirements.txt index 0f054df2e..dbf3e8c60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # Automatically generated from pyproject.toml by gen_requirements_txt.py script. # DO NOT EDIT! Only for reference use. -aiofiles<25,>=24.1 +anyio>=4.5.1,<5 aiohttp>=3.10.11,<3.12 cryptography>=43.0.1,<45 grpcio>=1.53.2,<1.69 @@ -11,7 +11,7 @@ pydantic<3,>=2.10 pydantic-settings<3,>=2.3 pyyaml<7,>=6.0.1 requests<2.33,>=2.32 -simple-sqlite3-orm<0.7,>=0.6 +simple-sqlite3-orm<0.8,>=0.7 typing-extensions>=4.6.3 urllib3<2.3,>=2.2.2 uvicorn[standard]>=0.30,<0.35 diff --git a/src/ota_proxy/__init__.py b/src/ota_proxy/__init__.py index 7425d2811..bcce9177a 100644 --- a/src/ota_proxy/__init__.py +++ b/src/ota_proxy/__init__.py @@ -33,7 +33,7 @@ ) -async def run_otaproxy( +def run_otaproxy( host: str, port: int, *, @@ -45,6 +45,7 @@ async def run_otaproxy( enable_https: bool, external_cache_mnt_point: str | None = None, ): + import anyio import uvicorn from . import App, OTACache @@ -69,4 +70,4 @@ async def run_otaproxy( http="h11", ) _server = uvicorn.Server(_config) - await _server.serve() + anyio.run(_server.serve, backend="asyncio", backend_options={"use_uvloop": True}) diff --git a/src/ota_proxy/__main__.py b/src/ota_proxy/__main__.py index 2c7aad4ff..1814ddd8a 100644 --- a/src/ota_proxy/__main__.py +++ b/src/ota_proxy/__main__.py @@ -16,11 +16,8 @@ from __future__ import annotations import argparse -import asyncio import logging -import uvloop - from . import run_otaproxy from .config import config as cfg @@ -78,17 +75,14 @@ args = parser.parse_args() logger.info(f"launch ota_proxy at {args.host}:{args.port}") - uvloop.install() - asyncio.run( - run_otaproxy( - host=args.host, - port=args.port, - cache_dir=args.cache_dir, - cache_db_f=args.cache_db_file, - enable_cache=args.enable_cache, - upper_proxy=args.upper_proxy, - enable_https=args.enable_https, - init_cache=args.init_cache, - external_cache_mnt_point=args.external_cache_mnt_point, - ) + run_otaproxy( + host=args.host, + port=args.port, + cache_dir=args.cache_dir, + cache_db_f=args.cache_db_file, + enable_cache=args.enable_cache, + upper_proxy=args.upper_proxy, + enable_https=args.enable_https, + init_cache=args.init_cache, + external_cache_mnt_point=args.external_cache_mnt_point, ) diff --git a/src/ota_proxy/cache_streaming.py b/src/ota_proxy/cache_streaming.py index 17e14b02f..5ffaf6016 100644 --- a/src/ota_proxy/cache_streaming.py +++ b/src/ota_proxy/cache_streaming.py @@ -22,11 +22,11 @@ import os import threading import weakref -from concurrent.futures import Executor from pathlib import Path from typing import AsyncGenerator, AsyncIterator, Callable, Coroutine -import aiofiles +import anyio +from anyio import open_file from otaclient_common.common import get_backoff from otaclient_common.typing import StrOrPath @@ -101,11 +101,10 @@ def __init__( *, base_dir: StrOrPath, commit_cache_cb: _CACHE_ENTRY_REGISTER_CALLBACK, - executor: Executor, below_hard_limit_event: threading.Event, ): self.fpath = Path(base_dir) / self._tmp_file_naming(cache_identifier) - self.save_path = Path(base_dir) / cache_identifier + self.save_path = anyio.Path(base_dir) / cache_identifier self.cache_meta: CacheMeta | None = None self._commit_cache_cb = commit_cache_cb @@ -113,7 +112,6 @@ def __init__( self._writer_finished = asyncio.Event() self._writer_failed = asyncio.Event() - self._executor = executor self._space_availability_event = below_hard_limit_event self._bytes_written = 0 @@ -147,7 +145,7 @@ async def _provider_write_cache( """ logger.debug(f"start to cache for {cache_meta=}...") try: - async with aiofiles.open(self.fpath, "wb", executor=self._executor) as f: + async with await open_file(self.fpath, "wb") as f: _written = 0 while _data := (yield _written): if not self._space_availability_event.is_set(): @@ -179,7 +177,7 @@ async def _provider_write_cache( await self._commit_cache_cb(cache_meta) # finalize the cache file, skip finalize if the target file is # already presented. - if not self.save_path.is_file(): + if not await self.save_path.is_file(): os.link(self.fpath, self.save_path) except Exception as e: logger.warning(f"failed to write cache for {cache_meta=}: {e!r}") @@ -202,7 +200,7 @@ async def _subscriber_stream_cache(self) -> AsyncIterator[bytes]: """ err_count, _bytes_read = 0, 0 try: - async with aiofiles.open(self.fpath, "rb", executor=self._executor) as f: + async with await open_file(self.fpath, "rb") as f: while ( not self._writer_finished.is_set() or _bytes_read < self._bytes_written diff --git a/src/ota_proxy/ota_cache.py b/src/ota_proxy/ota_cache.py index 3c8fbacf0..968bccfb2 100644 --- a/src/ota_proxy/ota_cache.py +++ b/src/ota_proxy/ota_cache.py @@ -20,12 +20,13 @@ import shutil import threading import time -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import AsyncIterator, Mapping, Optional from urllib.parse import SplitResult, quote, urlsplit import aiohttp +import anyio +import anyio.to_thread from multidict import CIMultiDict, CIMultiDictProxy from otaclient_common.common import get_backoff @@ -133,10 +134,6 @@ def __init__( db_f.unlink(missing_ok=True) self._init_cache = True # force init cache on db file cleanup - self._executor = ThreadPoolExecutor( - thread_name_prefix="ota_cache_fileio_executor" - ) - self._external_cache_data_dir = None self._external_cache_mp = None if external_cache_mnt_point and mount_external_cache(external_cache_mnt_point): @@ -145,7 +142,7 @@ def __init__( ) self._external_cache_mp = external_cache_mnt_point self._external_cache_data_dir = ( - Path(external_cache_mnt_point) / cfg.EXTERNAL_CACHE_DATA_DNAME + anyio.Path(external_cache_mnt_point) / cfg.EXTERNAL_CACHE_DATA_DNAME ) self._storage_below_hard_limit_event = threading.Event() @@ -189,11 +186,18 @@ async def start(self): # reuse the previously left ota_cache else: # cleanup unfinished tmp files - for tmp_f in self._base_dir.glob(f"{cfg.TMP_FILE_PREFIX}*"): - tmp_f.unlink(missing_ok=True) + async for tmp_f in anyio.Path(self._base_dir).glob( + f"{cfg.TMP_FILE_PREFIX}*" + ): + await tmp_f.unlink(missing_ok=True) # dispatch a background task to pulling the disk usage info - self._executor.submit(self._background_check_free_space) + _free_space_check_thread = threading.Thread( + target=self._background_check_free_space, + daemon=True, + name="ota_cache_free_space_checker", + ) + _free_space_check_thread.start() # init cache helper(and connect to ota_cache db) self._lru_helper = LRUCacheHelper( @@ -222,7 +226,6 @@ async def close(self): if not self._closed: self._closed = True await self._session.close() - self._executor.shutdown(wait=True) if self._cache_enabled: self._lru_helper.close() @@ -311,7 +314,7 @@ async def _reserve_space(self, size: int) -> bool: logger.debug( f"rotate on bucket({size=}), num of entries to be cleaned {len(_hashes)=}" ) - self._executor.submit(self._cache_entries_cleanup, _hashes) + await anyio.to_thread.run_sync(self._cache_entries_cleanup, _hashes) return True else: logger.debug(f"rotate on bucket({size=}) failed, no enough entries") @@ -429,7 +432,7 @@ async def _retrieve_file_by_cache_lookup( # NOTE: db_entry.file_sha256 can be either # 1. valid sha256 value for corresponding plain uncompressed OTA file # 2. URL based sha256 value for corresponding requested URL - cache_file = self._base_dir / cache_identifier + cache_file = anyio.Path(self._base_dir / cache_identifier) # check if cache file exists # NOTE(20240729): there is an edge condition that the finished cached file is not yet renamed, @@ -437,11 +440,11 @@ async def _retrieve_file_by_cache_lookup( # cache_commit_callback to rename the tmp file. _retry_count_max, _factor, _backoff_max = 6, 0.01, 0.1 # 0.255s in total for _retry_count in range(_retry_count_max): - if cache_file.is_file(): + if await cache_file.is_file(): break await asyncio.sleep(get_backoff(_retry_count, _factor, _backoff_max)) - if not cache_file.is_file(): + if not await cache_file.is_file(): logger.warning( f"dangling cache entry found, remove db entry: {meta_db_entry}" ) @@ -452,7 +455,7 @@ async def _retrieve_file_by_cache_lookup( # do the job. If cache is invalid, otaclient will use CacheControlHeader's retry_cache # directory to indicate invalid cache. return ( - read_file(cache_file, executor=self._executor), + read_file(cache_file), meta_db_entry.export_headers_to_client(), ) @@ -470,11 +473,11 @@ async def _retrieve_file_by_external_cache( cache_identifier = client_cache_policy.file_sha256 cache_file = self._external_cache_data_dir / cache_identifier - cache_file_zst = cache_file.with_suffix( - f".{cfg.EXTERNAL_CACHE_STORAGE_COMPRESS_ALG}" + cache_file_zst = anyio.Path( + cache_file.with_suffix(f".{cfg.EXTERNAL_CACHE_STORAGE_COMPRESS_ALG}") ) - if cache_file_zst.is_file(): + if await cache_file_zst.is_file(): _header = CIMultiDict() _header[HEADER_OTA_FILE_CACHE_CONTROL] = ( OTAFileCacheControl.export_kwargs_as_header( @@ -482,16 +485,16 @@ async def _retrieve_file_by_external_cache( file_compression_alg=cfg.EXTERNAL_CACHE_STORAGE_COMPRESS_ALG, ) ) - return read_file(cache_file_zst, executor=self._executor), _header + return read_file(cache_file_zst), _header - if cache_file.is_file(): + if await cache_file.is_file(): _header = CIMultiDict() _header[HEADER_OTA_FILE_CACHE_CONTROL] = ( OTAFileCacheControl.export_kwargs_as_header( file_sha256=cache_identifier ) ) - return read_file(cache_file, executor=self._executor), _header + return read_file(cache_file), _header async def _retrieve_file_by_new_caching( self, @@ -534,7 +537,6 @@ async def _retrieve_file_by_new_caching( tracker = CacheTracker( cache_identifier=cache_identifier, base_dir=self._base_dir, - executor=self._executor, commit_cache_cb=self._commit_cache_callback, below_hard_limit_event=self._storage_below_hard_limit_event, ) diff --git a/src/ota_proxy/utils.py b/src/ota_proxy/utils.py index a852b1597..8b5e2d617 100644 --- a/src/ota_proxy/utils.py +++ b/src/ota_proxy/utils.py @@ -1,18 +1,17 @@ from __future__ import annotations -from concurrent.futures import Executor from hashlib import sha256 from os import PathLike from typing import AsyncIterator -import aiofiles +from anyio import open_file from .config import config as cfg -async def read_file(fpath: PathLike, *, executor: Executor) -> AsyncIterator[bytes]: - """Open and read a file asynchronously with aiofiles.""" - async with aiofiles.open(fpath, "rb", executor=executor) as f: +async def read_file(fpath: PathLike) -> AsyncIterator[bytes]: + """Open and read a file asynchronously.""" + async with await open_file(fpath, "rb") as f: while data := await f.read(cfg.CHUNK_SIZE): yield data diff --git a/src/otaclient/_otaproxy_ctx.py b/src/otaclient/_otaproxy_ctx.py index e08a50638..b3e495429 100644 --- a/src/otaclient/_otaproxy_ctx.py +++ b/src/otaclient/_otaproxy_ctx.py @@ -19,7 +19,6 @@ from __future__ import annotations -import asyncio import atexit import logging import multiprocessing as mp @@ -78,18 +77,16 @@ def otaproxy_process(*, init_cache: bool) -> None: logger.info(f"wait for {upper_proxy=} online...") ensure_otaproxy_start(str(upper_proxy)) - asyncio.run( - run_otaproxy( - host=host, - port=port, - init_cache=init_cache, - cache_dir=local_otaproxy_cfg.BASE_DIR, - cache_db_f=local_otaproxy_cfg.DB_FILE, - upper_proxy=upper_proxy, - enable_cache=proxy_info.enable_local_ota_proxy_cache, - enable_https=proxy_info.gateway_otaproxy, - external_cache_mnt_point=external_cache_mnt_point, - ) + run_otaproxy( + host=host, + port=port, + init_cache=init_cache, + cache_dir=local_otaproxy_cfg.BASE_DIR, + cache_db_f=local_otaproxy_cfg.DB_FILE, + upper_proxy=upper_proxy, + enable_cache=proxy_info.enable_local_ota_proxy_cache, + enable_https=proxy_info.gateway_otaproxy, + external_cache_mnt_point=external_cache_mnt_point, ) diff --git a/tests/test_ota_proxy/test_cache_streaming.py b/tests/test_ota_proxy/test_cache_streaming.py index 8efdbcd1c..8701ad2bc 100644 --- a/tests/test_ota_proxy/test_cache_streaming.py +++ b/tests/test_ota_proxy/test_cache_streaming.py @@ -87,7 +87,6 @@ async def _worker( _tracker = CacheTracker( cache_identifier=self.URL, base_dir=self.base_dir, - executor=None, # type: ignore commit_cache_cb=None, # type: ignore below_hard_limit_event=None, # type: ignore ) diff --git a/tests/test_ota_proxy/test_subprocess_launch_otaproxy.py b/tests/test_ota_proxy/test_subprocess_launch_otaproxy.py index 817e91d86..17b3611b0 100644 --- a/tests/test_ota_proxy/test_subprocess_launch_otaproxy.py +++ b/tests/test_ota_proxy/test_subprocess_launch_otaproxy.py @@ -15,7 +15,6 @@ from __future__ import annotations -import asyncio import multiprocessing as mp import time from pathlib import Path @@ -27,17 +26,15 @@ def otaproxy_process(cache_dir: str): ota_cache_dir = Path(cache_dir) ota_cache_db = ota_cache_dir / "cache_db" - asyncio.run( - run_otaproxy( - host="127.0.0.1", - port=8082, - init_cache=True, - cache_dir=str(ota_cache_dir), - cache_db_f=str(ota_cache_db), - upper_proxy="", - enable_cache=True, - enable_https=False, - ), + run_otaproxy( + host="127.0.0.1", + port=8082, + init_cache=True, + cache_dir=str(ota_cache_dir), + cache_db_f=str(ota_cache_db), + upper_proxy="", + enable_cache=True, + enable_https=False, ) From 9fb16b9515c7c7af0160511922077e97a58a35bc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 25 Dec 2024 11:06:57 +0900 Subject: [PATCH 5/6] build(deps): Update urllib3 requirement (#468) Updates the requirements on [urllib3](https://github.com/urllib3/urllib3) to permit the latest version. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/2.2.2...2.3.0) --- updated-dependencies: - dependency-name: urllib3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pyproject.toml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8a2ab5a74..5ca94fa57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "requests<2.33,>=2.32", "simple-sqlite3-orm<0.8,>=0.7", "typing-extensions>=4.6.3", - "urllib3<2.3,>=2.2.2", + "urllib3>=2.2.2,<2.4", "uvicorn[standard]>=0.30,<0.35", "zstandard<0.24,>=0.22", ] diff --git a/requirements.txt b/requirements.txt index dbf3e8c60..b86a08723 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,6 @@ pyyaml<7,>=6.0.1 requests<2.33,>=2.32 simple-sqlite3-orm<0.8,>=0.7 typing-extensions>=4.6.3 -urllib3<2.3,>=2.2.2 +urllib3>=2.2.2,<2.4 uvicorn[standard]>=0.30,<0.35 zstandard<0.24,>=0.22 From fefed56000726278dd76840e67d6eddf630d8769 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Wed, 25 Dec 2024 13:13:57 +0900 Subject: [PATCH 6/6] refactor: otaclient_common.logging: add get_burst_suppressed_logger helper for creating burst_suppressed logger (#469) This PR adds a helper method get_burst_suppressed_logger for creating burst_suppressed logger with ease. Changes of integration: 1. ota_proxy: use get_burst_suppressed_logger. 2. otaclient.api_v2: use get_burst_suppressed_logger. --- src/ota_proxy/lru_cache_helper.py | 14 +----- src/ota_proxy/server_app.py | 12 +---- src/otaclient/_status_monitor.py | 14 ++---- src/otaclient/grpc/api_v2/ecu_tracker.py | 14 ++---- src/otaclient_common/logging.py | 46 ++++++++++++++++++- .../test_logging.py | 36 +++++++++------ 6 files changed, 76 insertions(+), 60 deletions(-) rename tests/{ => test_otaclient_common}/test_logging.py (58%) diff --git a/src/ota_proxy/lru_cache_helper.py b/src/ota_proxy/lru_cache_helper.py index 8772e14d5..fcf77bb1d 100644 --- a/src/ota_proxy/lru_cache_helper.py +++ b/src/ota_proxy/lru_cache_helper.py @@ -17,27 +17,17 @@ from __future__ import annotations import bisect -import logging import sqlite3 import time from pathlib import Path from simple_sqlite3_orm import utils -from otaclient_common.logging import BurstSuppressFilter +from otaclient_common.logging import get_burst_suppressed_logger from .db import AsyncCacheMetaORM, CacheMeta -burst_suppressed_logger = logging.getLogger(f"{__name__}.db_error") -# NOTE: for request_error, only allow max 6 lines of logging per 30 seconds -burst_suppressed_logger.addFilter( - BurstSuppressFilter( - f"{__name__}.db_error", - upper_logger_name=__name__, - burst_round_length=30, - burst_max=6, - ) -) +burst_suppressed_logger = get_burst_suppressed_logger(f"{__name__}.db_error") class LRUCacheHelper: diff --git a/src/ota_proxy/server_app.py b/src/ota_proxy/server_app.py index bd1999a00..6d43b16fb 100644 --- a/src/ota_proxy/server_app.py +++ b/src/ota_proxy/server_app.py @@ -23,7 +23,7 @@ import aiohttp from multidict import CIMultiDict, CIMultiDictProxy -from otaclient_common.logging import BurstSuppressFilter +from otaclient_common.logging import get_burst_suppressed_logger from ._consts import ( BHEADER_AUTHORIZATION, @@ -46,16 +46,8 @@ from .ota_cache import OTACache logger = logging.getLogger(__name__) -burst_suppressed_logger = logging.getLogger(f"{__name__}.request_error") # NOTE: for request_error, only allow max 6 lines of logging per 30 seconds -burst_suppressed_logger.addFilter( - BurstSuppressFilter( - f"{__name__}.request_error", - upper_logger_name=__name__, - burst_round_length=30, - burst_max=6, - ) -) +burst_suppressed_logger = get_burst_suppressed_logger(f"{__name__}.request_error") # only expose app __all__ = ("App",) diff --git a/src/otaclient/_status_monitor.py b/src/otaclient/_status_monitor.py index 9db09cfe0..6aba1b1c8 100644 --- a/src/otaclient/_status_monitor.py +++ b/src/otaclient/_status_monitor.py @@ -35,19 +35,11 @@ UpdateTiming, ) from otaclient._utils import SharedOTAClientStatusWriter -from otaclient_common.logging import BurstSuppressFilter +from otaclient_common.logging import get_burst_suppressed_logger logger = logging.getLogger(__name__) -burst_suppressed_logger = logging.getLogger(f"{__name__}.shm_push") -# NOTE: for request_error, only allow max 6 lines of logging per 30 seconds -burst_suppressed_logger.addFilter( - BurstSuppressFilter( - f"{__name__}.shm_push", - upper_logger_name=__name__, - burst_round_length=30, - burst_max=6, - ) -) +# NOTE: suppress error logging for pushing OTA status to shm +burst_suppressed_logger = get_burst_suppressed_logger(f"{__name__}.shm_push") _status_report_queue: queue.Queue | None = None diff --git a/src/otaclient/grpc/api_v2/ecu_tracker.py b/src/otaclient/grpc/api_v2/ecu_tracker.py index ba454f138..ee7a26f2c 100644 --- a/src/otaclient/grpc/api_v2/ecu_tracker.py +++ b/src/otaclient/grpc/api_v2/ecu_tracker.py @@ -27,19 +27,11 @@ from otaclient.grpc.api_v2.ecu_status import ECUStatusStorage from otaclient_api.v2 import types as api_types from otaclient_api.v2.api_caller import ECUNoResponse, OTAClientCall -from otaclient_common.logging import BurstSuppressFilter +from otaclient_common.logging import get_burst_suppressed_logger logger = logging.getLogger(__name__) -burst_suppressed_logger = logging.getLogger(f"{__name__}.local_ecu_check") -# NOTE: for request_error, only allow max 6 lines of logging per 30 seconds -burst_suppressed_logger.addFilter( - BurstSuppressFilter( - f"{__name__}.local_ecu_check", - upper_logger_name=__name__, - burst_round_length=30, - burst_max=6, - ) -) +# NOTE: suppress error loggings from checking local ECU's status shm +burst_suppressed_logger = get_burst_suppressed_logger(f"{__name__}.local_ecu_check") # actively polling ECUs status until we get the first valid response # when otaclient is just starting. diff --git a/src/otaclient_common/logging.py b/src/otaclient_common/logging.py index 8e37f3e9a..4fd0c4cd9 100644 --- a/src/otaclient_common/logging.py +++ b/src/otaclient_common/logging.py @@ -13,10 +13,11 @@ # limitations under the License. +from __future__ import annotations + import itertools import logging import time -from typing import Optional class BurstSuppressFilter(logging.Filter): @@ -25,7 +26,7 @@ def __init__( name: str, burst_max: int, burst_round_length: int, - upper_logger_name: Optional[str] = None, + upper_logger_name: str | None = None, ) -> None: self.name = name self.upper_logger_name = upper_logger_name @@ -61,3 +62,44 @@ def filter(self, record: logging.LogRecord) -> bool: ) self._round_warned = True return False + + +def get_burst_suppressed_logger( + _logger: logging.Logger | str, + *, + upper_logger_name: str | None = None, + burst_max: int = 6, + burst_round_length: int = 16, +) -> logging.Logger: + """Configure the logger by <_logger> and attach a burst_suppressed_filter to it + + Args: + _logger (logging.Logger | str): an logger object or the name of the logger. + upper_logger_name (str | None, optional): upper_logger for logging the log suppressed warning. + If not specified, will be the direct upper logger of the <_logger>. Defaults to None. + burst_max (int, optional): how many logs can be emitted per round. Defaults to 6. + burst_round_length (int, optional): the time span of suppressing round. Defaults to 30 (seconds). + """ + if isinstance(_logger, str): + this_logger_name = _logger + this_logger = logging.getLogger(this_logger_name) + else: + this_logger = _logger + this_logger_name = _logger.name + + if not isinstance(upper_logger_name, str): + _parents = this_logger_name.split(".")[:-1] + if _parents: + upper_logger_name = ".".join(_parents) + else: # NOTE: will be the root logger + upper_logger_name = None + + this_logger.addFilter( + BurstSuppressFilter( + this_logger_name, + upper_logger_name=upper_logger_name, + burst_max=burst_max, + burst_round_length=burst_round_length, + ) + ) + return this_logger diff --git a/tests/test_logging.py b/tests/test_otaclient_common/test_logging.py similarity index 58% rename from tests/test_logging.py rename to tests/test_otaclient_common/test_logging.py index ef3c15e18..f1288a7bc 100644 --- a/tests/test_logging.py +++ b/tests/test_otaclient_common/test_logging.py @@ -13,40 +13,48 @@ # limitations under the License. -import logging import time from pytest import LogCaptureFixture from otaclient_common import logging as _logging +TEST_ROUND = 10 +TEST_LOGGINGS_NUM = 3000 + + +def test_burst_logging(caplog: LogCaptureFixture): + logger_name = "upper_logger.intermediate_logger.this_logger" + upper_logger = "upper_logger.intermediate_logger" -def test_BurstSuppressFilter(caplog: LogCaptureFixture): - logger_name = "test_BurstSuppressFilter" - logger = logging.getLogger(logger_name) burst_round_length = 1 - logger.addFilter( - _logging.BurstSuppressFilter( - logger_name, - burst_max=1, - burst_round_length=burst_round_length, - ) + + logger = _logging.get_burst_suppressed_logger( + logger_name, + # NOTE: test upper_logger_name calculated from logger_name + burst_max=1, + burst_round_length=burst_round_length, ) # test loggging suppressing # NOTE: outer loop ensures that suppression only works # within each burst_round, and should be refresed # in new round. - for _ in range(2): - for idx in range(2000): + for _ in range(TEST_ROUND): + for idx in range(TEST_LOGGINGS_NUM): logger.error(idx) time.sleep(burst_round_length * 2) logger.error("burst_round end") - # the four logging lines are: + # For each round, the loggings will be as follow: # 1. logger.error(idx) # idx==0 # 2. a warning of exceeded loggings are suppressed # 3. a warning of how many loggings are suppressed # 4. logger.error("burst_round end") - assert len(caplog.records) <= 4 + # NOTE that the logger.error("burst_round end") will be included in the + # next burst_suppressing roud, so excepts for the first round, we will + # only have three records. + assert len(records := caplog.records) <= 4 + # warning msg comes from upper_logger + assert records[1].name == upper_logger caplog.clear()