From 4723777221c1b61c166a7d6d08d9ec79c3258db1 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:40:20 +0900 Subject: [PATCH] fix: ecu_status: should use any_child_ecu_in_update flag instead of any_in_update to avoid self-lockedown, still wait on any_child_ecu_in_update before reboot (#459) This PR reverts the change that otaclient waits for any_requires_network instead of any_in_update. This change was introduced after otaclient v3.8.x. Also, a logic fix is introduced to use any_child_ecu_in_update instead of any_in_update. any_in_update will be set when any ECU(including self-ECU) is in UPDATE. If otaclient waits for this flag before OTA reboot(at this point the self-ECU's OTA status is still UPDATING), it will be dead-locked by itself. Fix by using any_child_ecu_in_update instead, which aligns with the previous implementation at otaclient v3.8.x. --- src/otaclient/_types.py | 2 +- src/otaclient/grpc/api_v2/ecu_status.py | 77 +++++-------------- src/otaclient/main.py | 2 +- src/otaclient/ota_core.py | 4 +- tests/test_otaclient/test_create_standby.py | 4 +- .../test_grpc/test_api_v2/test_ecu_status.py | 67 ++++++++++++++-- tests/test_otaclient/test_ota_core.py | 6 +- 7 files changed, 88 insertions(+), 74 deletions(-) diff --git a/src/otaclient/_types.py b/src/otaclient/_types.py index 92981290d..603aaea5c 100644 --- a/src/otaclient/_types.py +++ b/src/otaclient/_types.py @@ -126,7 +126,7 @@ class OTAClientStatus: @dataclass class MultipleECUStatusFlags: - any_in_update: mp_sync.Event + any_child_ecu_in_update: mp_sync.Event any_requires_network: mp_sync.Event all_success: mp_sync.Event diff --git a/src/otaclient/grpc/api_v2/ecu_status.py b/src/otaclient/grpc/api_v2/ecu_status.py index 66b8b68b3..7ba274532 100644 --- a/src/otaclient/grpc/api_v2/ecu_status.py +++ b/src/otaclient/grpc/api_v2/ecu_status.py @@ -44,13 +44,11 @@ import math import time from itertools import chain -from typing import Dict, Iterable, Optional from otaclient._types import MultipleECUStatusFlags, OTAClientStatus from otaclient.configs.cfg import cfg, ecu_info from otaclient.grpc.api_v2.types import convert_to_apiv2_status from otaclient_api.v2 import types as api_types -from otaclient_common.typing import T logger = logging.getLogger(__name__) @@ -64,23 +62,6 @@ ACTIVE_POLLING_INTERVAL = 1 # seconds -class _OrderedSet(Dict[T, None]): - def __init__(self, _input: Optional[Iterable[T]]): - if _input: - for elem in _input: - self[elem] = None - super().__init__() - - def add(self, value: T): - self[value] = None - - def remove(self, value: T): - super().pop(value) - - def discard(self, value: T): - super().pop(value, None) - - class ECUStatusStorage: def __init__( @@ -95,27 +76,13 @@ def __init__( # ECU status storage self.storage_last_updated_timestamp = 0 - # ECUs that are/will be active during an OTA session, - # at init it will be the ECUs listed in available_ecu_ids defined - # in ecu_info.yaml. - # When receives update request, the list will be set to include ECUs - # listed in the update request, and be extended by merging - # available_ecu_ids in sub ECUs' status report. - # Internally referenced when generating overall ECU status report. - # TODO: in the future if otaclient can preserve OTA session info, - # ECUStatusStorage should restore the tracked_active_ecus info - # in the saved session info. - self._tracked_active_ecus: _OrderedSet[str] = _OrderedSet( - ecu_info.get_available_ecu_ids() - ) - # The attribute that will be exported in status API response, - # NOTE(20230801): available_ecu_ids only serves information purpose, - # it should only be updated with ecu_info.yaml or merging - # available_ecu_ids field in sub ECUs' status report. # NOTE(20230801): for web.auto user, available_ecu_ids in status API response # will be used to generate update request list, so be-careful! - self._available_ecu_ids: _OrderedSet[str] = _OrderedSet( + # NOTE(20241219): we will only look at status of ECUs listed in available_ecu_ids. + # ECUs that in the secondaries field but no in available_ecu_ids field + # are considered to be the ECUs not ready for OTA. See ecu_info.yaml doc. + self._available_ecu_ids: dict[str, None] = dict.fromkeys( ecu_info.get_available_ecu_ids() ) @@ -185,7 +152,7 @@ async def _generate_overall_status_report(self): for status in chain( self._all_ecus_status_v2.values(), self._all_ecus_status_v1.values() ) - if status.ecu_id in self._tracked_active_ecus + if status.ecu_id in self._available_ecu_ids and status.is_in_update and status.ecu_id not in lost_ecus } @@ -195,10 +162,11 @@ async def _generate_overall_status_report(self): "new ECU(s) that acks update request and enters OTA update detected" f"{_new_in_update_ecu}, current updating ECUs: {in_update_ecus_id}" ) - if in_update_ecus_id: - ecu_status_flags.any_in_update.set() + + if self.in_update_child_ecus_id: + ecu_status_flags.any_child_ecu_in_update.set() else: - ecu_status_flags.any_in_update.clear() + ecu_status_flags.any_child_ecu_in_update.clear() # check if there is any failed child/self ECU in tracked active ECUs set _old_failed_ecus_id = self.failed_ecus_id @@ -207,7 +175,7 @@ async def _generate_overall_status_report(self): for status in chain( self._all_ecus_status_v2.values(), self._all_ecus_status_v1.values() ) - if status.ecu_id in self._tracked_active_ecus + if status.ecu_id in self._available_ecu_ids and status.is_failed and status.ecu_id not in lost_ecus } @@ -223,7 +191,7 @@ async def _generate_overall_status_report(self): for status in chain( self._all_ecus_status_v2.values(), self._all_ecus_status_v1.values() ) - if status.ecu_id in self._tracked_active_ecus + if status.ecu_id in self._available_ecu_ids and status.ecu_id not in lost_ecus ) ): @@ -240,12 +208,12 @@ async def _generate_overall_status_report(self): for status in chain( self._all_ecus_status_v2.values(), self._all_ecus_status_v1.values() ) - if status.ecu_id in self._tracked_active_ecus + if status.ecu_id in self._available_ecu_ids and status.is_success and status.ecu_id not in lost_ecus } # NOTE: all_success doesn't count the lost ECUs - if len(self.success_ecus_id) == len(self._tracked_active_ecus): + if self.success_ecus_id == set(self._available_ecu_ids): ecu_status_flags.all_success.set() else: ecu_status_flags.all_success.clear() @@ -334,19 +302,20 @@ async def on_ecus_accept_update_request(self, ecus_accept_update: set[str]): """ ecu_status_flags = self.ecu_status_flags async with self._properties_update_lock: - self._tracked_active_ecus = _OrderedSet(ecus_accept_update) - self.last_update_request_received_timestamp = int(time.time()) self.lost_ecus_id -= ecus_accept_update self.failed_ecus_id -= ecus_accept_update + self.success_ecus_id -= ecus_accept_update self.in_update_ecus_id.update(ecus_accept_update) self.in_update_child_ecus_id = self.in_update_ecus_id - {self.my_ecu_id} - self.success_ecus_id -= ecus_accept_update ecu_status_flags.all_success.clear() ecu_status_flags.any_requires_network.set() - ecu_status_flags.any_in_update.set() + if self.in_update_child_ecus_id: + ecu_status_flags.any_child_ecu_in_update.set() + else: + ecu_status_flags.any_child_ecu_in_update.clear() def get_polling_interval(self) -> int: """Return if there is active OTA update, @@ -355,11 +324,8 @@ def get_polling_interval(self) -> int: NOTE: use get_polling_waiter if want to wait, only call this method if one only wants to get the polling interval value. """ - ecu_status_flags = self.ecu_status_flags return ( - ACTIVE_POLLING_INTERVAL - if ecu_status_flags.any_in_update.is_set() - else IDLE_POLLING_INTERVAL + ACTIVE_POLLING_INTERVAL if self.in_update_ecus_id else IDLE_POLLING_INTERVAL ) def get_polling_waiter(self): @@ -377,13 +343,12 @@ def get_polling_waiter(self): _inner_wait_interval = 1 # second async def _waiter(): - ecu_status_flags = self.ecu_status_flags - if ecu_status_flags.any_in_update.is_set(): + if self.in_update_ecus_id: await asyncio.sleep(ACTIVE_POLLING_INTERVAL) return for _ in range(math.ceil(IDLE_POLLING_INTERVAL / _inner_wait_interval)): - if ecu_status_flags.any_in_update.is_set(): + if self.in_update_ecus_id: return await asyncio.sleep(_inner_wait_interval) diff --git a/src/otaclient/main.py b/src/otaclient/main.py index 4d0316993..cecb4391c 100644 --- a/src/otaclient/main.py +++ b/src/otaclient/main.py @@ -116,7 +116,7 @@ def main() -> None: # pragma: no cover local_otaclient_op_queue = mp_ctx.Queue() local_otaclient_resp_queue = mp_ctx.Queue() ecu_status_flags = MultipleECUStatusFlags( - any_in_update=mp_ctx.Event(), + any_child_ecu_in_update=mp_ctx.Event(), any_requires_network=mp_ctx.Event(), all_success=mp_ctx.Event(), ) diff --git a/src/otaclient/ota_core.py b/src/otaclient/ota_core.py index 77696ff64..c36759cad 100644 --- a/src/otaclient/ota_core.py +++ b/src/otaclient/ota_core.py @@ -579,11 +579,9 @@ def _execute_update(self): ) ) - # NOTE: we don't need to wait for sub ECUs if sub ECUs don't - # depend on otaproxy on this ECU. if proxy_info.enable_local_ota_proxy: wait_and_log( - check_flag=self.ecu_status_flags.any_requires_network.is_set, + check_flag=self.ecu_status_flags.any_child_ecu_in_update.is_set, check_for=False, message="permit reboot flag", log_func=logger.info, diff --git a/tests/test_otaclient/test_create_standby.py b/tests/test_otaclient/test_create_standby.py index b0abbbec8..cb2abf3bf 100644 --- a/tests/test_otaclient/test_create_standby.py +++ b/tests/test_otaclient/test_create_standby.py @@ -106,7 +106,7 @@ def test_update_with_rebuild_mode( ): status_collector, status_report_queue = ota_status_collector ecu_status_flags = mocker.MagicMock() - ecu_status_flags.any_requires_network.is_set = mocker.MagicMock( + ecu_status_flags.any_child_ecu_in_update.is_set = mocker.MagicMock( return_value=False ) @@ -145,7 +145,7 @@ def test_update_with_rebuild_mode( # ------ assertions ------ # persist_handler.assert_called_once() - ecu_status_flags.any_requires_network.is_set.assert_called_once() + ecu_status_flags.any_child_ecu_in_update.is_set.assert_called_once() # --- ensure the update stats are collected collected_status = status_collector.otaclient_status assert collected_status diff --git a/tests/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py b/tests/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py index 8e3081f8d..d032c9c74 100644 --- a/tests/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py +++ b/tests/test_otaclient/test_grpc/test_api_v2/test_ecu_status.py @@ -53,7 +53,7 @@ async def setup_test(self, mocker: MockerFixture, ecu_info_fixture: ECUInfo): # init and setup the ecu_storage # NOTE: here we use threading.Event instead self.ecu_status_flags = ecu_status_flags = MultipleECUStatusFlags( - any_in_update=threading.Event(), # type: ignore[assignment] + any_child_ecu_in_update=threading.Event(), # type: ignore[assignment] any_requires_network=threading.Event(), # type: ignore[assignment] all_success=threading.Event(), # type: ignore[assignment] ) @@ -380,7 +380,7 @@ async def test_export( }, # ecu_status_flags { - "any_in_update": True, + "any_child_ecu_in_update": True, "any_requires_network": True, "all_success": False, }, @@ -429,7 +429,55 @@ async def test_export( }, # ecu_status_flags { - "any_in_update": True, + "any_child_ecu_in_update": True, + "any_requires_network": True, + "all_success": False, + }, + ), + # case 3: + # only main ECU doing OTA update. + ( + # local ECU status: UPDATING + _internal_types.OTAClientStatus( + ota_status=_internal_types.OTAStatus.UPDATING, + update_phase=_internal_types.UpdatePhase.DOWNLOADING_OTA_FILES, + ), + # sub ECUs status + [ + # p1: SUCCESS + api_types.StatusResponse( + available_ecu_ids=["p1"], + ecu_v2=[ + api_types.StatusResponseEcuV2( + ecu_id="p1", + ota_status=api_types.StatusOta.SUCCESS, + ), + ], + ), + # p2: SUCCESS + api_types.StatusResponse( + available_ecu_ids=["p2"], + ecu=[ + api_types.StatusResponseEcu( + ecu_id="p2", + status=api_types.Status( + status=api_types.StatusOta.SUCCESS, + ), + ) + ], + ), + ], + # expected overal ECUs status report set by on_ecus_accept_update_request, + { + "lost_ecus_id": set(), + "in_update_ecus_id": {"autoware"}, + "in_update_child_ecus_id": set(), + "failed_ecus_id": set(), + "success_ecus_id": {"p1", "p2"}, + }, + # ecu_status_flags + { + "any_child_ecu_in_update": False, "any_requires_network": True, "all_success": False, }, @@ -510,7 +558,7 @@ async def test_overall_ecu_status_report_generation( }, # ecu_status_flags { - "any_in_update": True, + "any_child_ecu_in_update": True, "any_requires_network": True, "all_success": False, }, @@ -562,7 +610,7 @@ async def test_overall_ecu_status_report_generation( }, # ecu_status_flags { - "any_in_update": True, + "any_child_ecu_in_update": True, "any_requires_network": True, "all_success": False, }, @@ -604,15 +652,18 @@ async def test_on_receive_update_request( for k, v in flags_status.items(): assert getattr(self.ecu_status_flags, k).is_set() == v - async def test_polling_waiter_switching_from_idling_to_active(self): + async def test_polling_waiter_switching_from_idling_to_active( + self, mocker: pytest_mock.MockerFixture + ): """Waiter should immediately return if active_ota_update_present is set.""" _sleep_time, _mocked_interval = 3, 60 + mocker.patch(f"{ECU_STATUS_MODULE}.IDLE_POLLING_INTERVAL", _mocked_interval) + async def _event_setter(): await asyncio.sleep(_sleep_time) - self.ecu_status_flags.any_in_update.set() + await self.ecu_storage.on_ecus_accept_update_request({"autoware"}) - self.ecu_status_flags.any_in_update.clear() _waiter = self.ecu_storage.get_polling_waiter() asyncio.create_task(_event_setter()) # waiter should return on active_ota_update_present is set, instead of waiting the diff --git a/tests/test_otaclient/test_ota_core.py b/tests/test_otaclient/test_ota_core.py index ffaab77b0..397e59098 100644 --- a/tests/test_otaclient/test_ota_core.py +++ b/tests/test_otaclient/test_ota_core.py @@ -161,7 +161,7 @@ def test_otaupdater( ) -> None: _, report_queue = ota_status_collector ecu_status_flags = mocker.MagicMock() - ecu_status_flags.any_requires_network.is_set = mocker.MagicMock( + ecu_status_flags.any_child_ecu_in_update.is_set = mocker.MagicMock( return_value=False ) @@ -202,7 +202,7 @@ def test_otaupdater( assert _downloaded_files_size == self._delta_bundle.total_download_files_size # assert the control_flags has been waited - ecu_status_flags.any_requires_network.is_set.assert_called_once() + ecu_status_flags.any_child_ecu_in_update.is_set.assert_called_once() assert _updater.update_version == str(cfg.UPDATE_VERSION) @@ -235,7 +235,7 @@ def mock_setup( ): _, status_report_queue = ota_status_collector ecu_status_flags = mocker.MagicMock() - ecu_status_flags.any_requires_network.is_set = mocker.MagicMock( + ecu_status_flags.any_child_ecu_in_update.is_set = mocker.MagicMock( return_value=False )