Skip to content

Commit

Permalink
Merge branch 'main' into feat/csv_parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodong-Yang authored Dec 26, 2024
2 parents 330a14f + 53c20b0 commit f48fb49
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 80 deletions.
10 changes: 9 additions & 1 deletion src/otaclient/_status_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,15 @@ def _status_collector_thread(self) -> None:
break

# ------ push status on load_report ------ #
if self.load_report(report) and self._status and _now > _next_shm_push:
# NOTE: always push OTAStatus change report
if (
self.load_report(report)
and self._status
and (
isinstance(report.payload, OTAStatusChangeReport)
or _now > _next_shm_push
)
):
try:
self._shm_status.write_msg(self._status)
_next_shm_push = _now + self.shm_push_interval
Expand Down
33 changes: 33 additions & 0 deletions src/otaclient/grpc/api_v2/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import multiprocessing.queues as mp_queue
from concurrent.futures import ThreadPoolExecutor

import otaclient.configs.cfg as otaclient_cfg
from otaclient._types import (
IPCRequest,
IPCResEnum,
Expand Down Expand Up @@ -111,6 +112,22 @@ async def update(
update_acked_ecus = set()
response = api_types.UpdateResponse()

# NOTE(20241220): due to the fact that OTA Service API doesn't have field
# in UpdateResponseEcu msg, the only way to pass the failure_msg
# to upper is by status API.
if not otaclient_cfg.ECU_INFO_LOADED_SUCCESSFULLY:
logger.error(
"ecu_info.yaml is not loaded properly, reject any update request"
)
for _update_req in request.iter_ecu():
response.add_ecu(
api_types.UpdateResponseEcu(
ecu_id=_update_req.ecu_id,
result=api_types.FailureType.UNRECOVERABLE,
)
)
return response

# first: dispatch update request to all directly connected subECUs
tasks: dict[asyncio.Task, ECUContact] = {}
for ecu_contact in self.sub_ecus:
Expand Down Expand Up @@ -225,6 +242,22 @@ async def rollback(
logger.info(f"receive rollback request: {request}")
response = api_types.RollbackResponse()

# NOTE(20241220): due to the fact that OTA Service API doesn't have field
# in UpdateResponseEcu msg, the only way to pass the failure_msg
# to upper is by status API.
if not otaclient_cfg.ECU_INFO_LOADED_SUCCESSFULLY:
logger.error(
"ecu_info.yaml is not loaded properly, reject any rollback request"
)
for _rollback_req in request.iter_ecu():
response.add_ecu(
api_types.RollbackResponseEcu(
ecu_id=_rollback_req.ecu_id,
result=api_types.FailureType.UNRECOVERABLE,
)
)
return response

# first: dispatch rollback request to all directly connected subECUs
tasks: dict[asyncio.Task, ECUContact] = {}
for ecu_contact in self.sub_ecus:
Expand Down
69 changes: 54 additions & 15 deletions src/otaclient/ota_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import requests.exceptions as requests_exc
from requests import Response

import otaclient.configs.cfg as otaclient_cfg
from ota_metadata.legacy import parser as ota_metadata_parser
from ota_metadata.legacy import types as ota_metadata_types
from ota_metadata.utils.cert_store import (
Expand Down Expand Up @@ -66,7 +67,12 @@
)
from otaclient._utils import SharedOTAClientStatusWriter, get_traceback, wait_and_log
from otaclient.boot_control import BootControllerProtocol, get_boot_controller
from otaclient.configs.cfg import cfg, ecu_info, proxy_info
from otaclient.configs.cfg import (
ECU_INFO_LOADED_SUCCESSFULLY,
cfg,
ecu_info,
proxy_info,
)
from otaclient.create_standby import (
StandbySlotCreatorProtocol,
get_standby_slot_creator,
Expand Down Expand Up @@ -672,17 +678,7 @@ def __init__(
)
return

# load and report booted OTA status
_boot_ctrl_loaded_ota_status = self.boot_controller.get_booted_ota_status()
self._live_ota_status = _boot_ctrl_loaded_ota_status
status_report_queue.put_nowait(
StatusReport(
payload=OTAStatusChangeReport(
new_ota_status=_boot_ctrl_loaded_ota_status,
),
)
)

# ------ load firmware version ------ #
self.current_version = self.boot_controller.load_version()
status_report_queue.put_nowait(
StatusReport(
Expand All @@ -691,7 +687,9 @@ def __init__(
),
)
)
logger.info(f"firmware_version: {self.current_version}")

# ------ load CA store ------ #
self.ca_chains_store = None
try:
self.ca_chains_store = load_ca_cert_chains(cfg.CERT_DPATH)
Expand All @@ -701,9 +699,36 @@ def __init__(

self.ca_chains_store = CAChainStore()

self.started = True
logger.info("otaclient started")
logger.info(f"firmware_version: {self.current_version}")
# load and report booted OTA status
_boot_ctrl_loaded_ota_status = self.boot_controller.get_booted_ota_status()
if not otaclient_cfg.ECU_INFO_LOADED_SUCCESSFULLY:
logger.error(
"ecu_info.yaml file is not loaded properly, will reject any OTA request."
)
logger.error(f"set live_ota_status to {OTAStatus.FAILURE!r}")
self._live_ota_status = OTAStatus.FAILURE
status_report_queue.put_nowait(
StatusReport(
payload=OTAStatusChangeReport(
new_ota_status=OTAStatus.FAILURE,
failure_type=FailureType.UNRECOVERABLE,
failure_reason=f"ecu_info.yaml file is broken or missing, please check {cfg.ECU_INFO_FPATH}. "
"reject any OTA request.",
),
)
)
else:
self._live_ota_status = _boot_ctrl_loaded_ota_status
status_report_queue.put_nowait(
StatusReport(
payload=OTAStatusChangeReport(
new_ota_status=_boot_ctrl_loaded_ota_status,
),
)
)

self.started = True
logger.info("otaclient started")

def _on_failure(
self,
Expand Down Expand Up @@ -843,6 +868,20 @@ def main(
)
)

elif not self.started:
_err_msg = "reject OTA request due to otaclient is not (yet) started."
if not ECU_INFO_LOADED_SUCCESSFULLY:
_err_msg = f"reject OTA request due to {cfg.ECU_INFO_FPATH} missing or broken"

logger.error(_err_msg)
resp_queue.put_nowait(
IPCResponse(
res=IPCResEnum.REJECT_OTHER,
msg=_err_msg,
session_id=request.session_id,
)
)

elif isinstance(request, UpdateRequestV2):

_update_thread = threading.Thread(
Expand Down
104 changes: 59 additions & 45 deletions tests/test_otaclient/test_configs/test_ecu_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@


@pytest.mark.parametrize(
"ecu_info_yaml, expected_ecu_info",
"ecu_info_yaml, expected_res",
(
# --- case 1: invalid ecu_info --- #
# case 1.1: valid yaml(empty file), invalid ecu_info
(
"# this is an empty file",
DEFAULT_ECU_INFO,
(False, DEFAULT_ECU_INFO),
),
# case 1.2: valid yaml(array), invalid ecu_info
(
("- this is an\n- yaml file that\n- contains a array\n"),
DEFAULT_ECU_INFO,
(False, DEFAULT_ECU_INFO),
),
# case 1.2: invalid yaml
(
" - \n not a \n [ valid yaml",
DEFAULT_ECU_INFO,
(False, DEFAULT_ECU_INFO),
),
# --- case 2: single ECU --- #
# case 2.1: basic single ECU
Expand All @@ -52,10 +52,13 @@
'ip_addr: "192.168.1.1"\n'
"bootloader: jetson-cboot\n"
),
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.JETSON_CBOOT,
(
True,
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.JETSON_CBOOT,
),
),
),
# case 2.2: single ECU with bootloader type specified
Expand All @@ -66,10 +69,13 @@
'ip_addr: "192.168.1.1"\n'
'bootloader: "grub"\n'
),
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.GRUB,
(
True,
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.GRUB,
),
),
),
# --- case 3: multiple ECUs --- #
Expand All @@ -87,21 +93,24 @@
'- ecu_id: "p2"\n'
' ip_addr: "192.168.0.12"\n'
),
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.GRUB,
available_ecu_ids=["autoware", "p1", "p2"],
secondaries=[
ECUContact(
ecu_id="p1",
ip_addr=IPv4Address("192.168.0.11"),
),
ECUContact(
ecu_id="p2",
ip_addr=IPv4Address("192.168.0.12"),
),
],
(
True,
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.GRUB,
available_ecu_ids=["autoware", "p1", "p2"],
secondaries=[
ECUContact(
ecu_id="p1",
ip_addr=IPv4Address("192.168.0.11"),
),
ECUContact(
ecu_id="p2",
ip_addr=IPv4Address("192.168.0.12"),
),
],
),
),
),
# case 3.2: multiple ECUs, with main ECU's bootloader specified
Expand All @@ -118,32 +127,37 @@
'- ecu_id: "p2"\n'
' ip_addr: "192.168.0.12"\n'
),
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.JETSON_UEFI,
available_ecu_ids=["autoware", "p1", "p2"],
secondaries=[
ECUContact(
ecu_id="p1",
ip_addr=IPv4Address("192.168.0.11"),
),
ECUContact(
ecu_id="p2",
ip_addr=IPv4Address("192.168.0.12"),
),
],
(
True,
ECUInfo(
ecu_id="autoware",
ip_addr=IPv4Address("192.168.1.1"),
bootloader=BootloaderType.JETSON_UEFI,
available_ecu_ids=["autoware", "p1", "p2"],
secondaries=[
ECUContact(
ecu_id="p1",
ip_addr=IPv4Address("192.168.0.11"),
),
ECUContact(
ecu_id="p2",
ip_addr=IPv4Address("192.168.0.12"),
),
],
),
),
),
),
)
def test_ecu_info(tmp_path: Path, ecu_info_yaml: str, expected_ecu_info: ECUInfo):
def test_ecu_info(
tmp_path: Path, ecu_info_yaml: str, expected_res: tuple[bool, ECUInfo]
):
# --- preparation --- #
(ota_dir := tmp_path / "boot" / "ota").mkdir(parents=True, exist_ok=True)
(ecu_info_file := ota_dir / "ecu_info.yaml").write_text(ecu_info_yaml)

# --- execution --- #
_, loaded_ecu_info = parse_ecu_info(ecu_info_file)
res = parse_ecu_info(ecu_info_file)

# --- assertion --- #
assert loaded_ecu_info == expected_ecu_info
assert res == expected_res
Loading

0 comments on commit f48fb49

Please sign in to comment.