From 57e55ad99e96ddb874dc4ed61fbff5f3154aafd4 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Thu, 24 Oct 2024 19:47:55 +0900 Subject: [PATCH] feat!: pre-load CA certification chains on otaclient starts up, reject OTA if no CA certs are installed (#408) This PR resolves an issue that comes from very old version of otaclient(since v1.0.0), which results in OTA image sign cert verification being skipped when no valid CA cert chains can be imported by otaclient. Also the logic of loading and managing the CA cert chains are split away from ota_metadata.legacy package and becomes a standalone module in ota_metadata.utils.cert_store. BREAKING CHANGE: Now otaclient will explicitly reject OTA when otaclient is not installed with valid CA cert chains. Also otaclient now only loads the CA cert chains once at startup. Re-configuring CA chains now requires otaclient restart. --- docker/test_base/Dockerfile | 39 ++++----- docker/test_base/entry_point.sh | 8 -- src/ota_metadata/legacy/parser.py | 77 +++++++---------- src/ota_metadata/utils/__init__.py | 13 +++ src/ota_metadata/utils/cert_store.py | 93 ++++++++++++++++++++ src/otaclient/app/ota_client.py | 27 +++++- tests/conftest.py | 1 + tests/keys/gen_certs.sh | 25 ++++-- tests/test_ota_metadata/test_ca_store.py | 95 +++++++++++++++++++++ tests/test_ota_metadata/test_legacy.py | 36 ++++---- tests/test_otaclient/test_create_standby.py | 13 ++- tests/test_otaclient/test_ota_client.py | 22 ++++- 12 files changed, 342 insertions(+), 107 deletions(-) create mode 100644 src/ota_metadata/utils/__init__.py create mode 100644 src/ota_metadata/utils/cert_store.py create mode 100644 tests/test_ota_metadata/test_ca_store.py diff --git a/docker/test_base/Dockerfile b/docker/test_base/Dockerfile index bcc63aa6f..5c32b129c 100644 --- a/docker/test_base/Dockerfile +++ b/docker/test_base/Dockerfile @@ -4,7 +4,7 @@ ARG UBUNTU_BASE # ------ stage 1: prepare base image ------ # # -FROM ${UBUNTU_BASE} AS builder +FROM ${UBUNTU_BASE} AS image_build SHELL ["/bin/bash", "-c"] ENV DEBIAN_FRONTEND=noninteractive @@ -12,10 +12,9 @@ ENV SPECIAL_FILE="path;adf.ae?qu.er\y=str#fragファイルement" # special treatment to the ota-image: create file that needs url escaping # NOTE: include special identifiers #?; into the pathname -RUN echo -n "${SPECIAL_FILE}" > "${OTA_IMAGE_DIR}/${SPECIAL_FILE}" - -# install required packages RUN set -eux; \ + echo -n "${SPECIAL_FILE}" > "/${SPECIAL_FILE}"; \ + # install required packages apt-get update -qq; \ apt-get install -y linux-image-generic; \ apt-get clean; \ @@ -44,26 +43,24 @@ ENV SPECIAL_FILE="path;adf.ae?qu.er\y=str#fragファイルement" RUN set -eux; \ apt-get update -qq; \ apt-get install -y -qq --no-install-recommends \ + gcc \ + git \ + libcurl4-openssl-dev \ + libssl-dev \ + python3-dev \ python3-minimal \ python3-pip \ python3-venv \ - python3-dev \ - libcurl4-openssl-dev \ - libssl-dev \ - gcc \ - wget \ - git; \ + wget; \ apt-get install -y -qq linux-image-generic; \ - apt-get clean - -# install hatch -RUN set -eux; \ + apt-get clean; \ + # install hatch python3 -m pip install --no-cache-dir -q -U pip; \ python3 -m pip install --no-cache-dir -U hatch WORKDIR ${OTA_IMAGE_SERVER_ROOT} -COPY --from=builder / /${OTA_IMAGE_DIR} +COPY --from=image_build / /${OTA_IMAGE_DIR} # generate test certs and sign key COPY --chmod=755 ./tests/keys/gen_certs.sh /tmp/certs/ @@ -72,10 +69,8 @@ RUN set -eu; \ pushd /tmp/certs; \ ./gen_certs.sh; \ cp ./* "${CERTS_DIR}"; \ - popd - -# build the test OTA image -RUN set -eu; \ + popd; \ + # build the test OTA image cp "${CERTS_DIR}"/sign.key sign.key; \ cp "${CERTS_DIR}"/sign.pem sign.pem; \ git clone ${OTA_METADATA_REPO}; \ @@ -96,10 +91,8 @@ RUN set -eu; \ --persistent-file ota-metadata/metadata/persistents.txt \ --rootfs-directory data \ --compressed-rootfs-directory data.zst; \ - cp ota-metadata/metadata/persistents.txt . - -# cleanup -RUN set -eux; \ + cp ota-metadata/metadata/persistents.txt .; \ + # cleanup apt-get clean; \ rm -rf \ /tmp/* \ diff --git a/docker/test_base/entry_point.sh b/docker/test_base/entry_point.sh index 014336a64..4416da4cc 100644 --- a/docker/test_base/entry_point.sh +++ b/docker/test_base/entry_point.sh @@ -4,18 +4,10 @@ set -eu TEST_ROOT=/test_root OTACLIENT_SRC=/otaclient_src OUTPUT_DIR="${OUTPUT_DIR:-/test_result}" -CERTS_DIR="${CERTS_DIR:-/certs}" # copy the source code as source is read-only cp -R "${OTACLIENT_SRC}" "${TEST_ROOT}" -# copy the certs generated in the docker image to otaclient dir -echo "setup certificates for testing..." -cd ${TEST_ROOT} -mkdir -p ./certs -cp -av ${CERTS_DIR}/root.pem ./certs/1.root.pem -cp -av ${CERTS_DIR}/interm.pem ./certs/1.interm.pem - # exec the input params echo "execute test with coverage" cd ${TEST_ROOT} diff --git a/src/ota_metadata/legacy/parser.py b/src/ota_metadata/legacy/parser.py index 893dafee1..ea69b486e 100644 --- a/src/ota_metadata/legacy/parser.py +++ b/src/ota_metadata/legacy/parser.py @@ -47,7 +47,6 @@ import shutil import time from dataclasses import dataclass, fields -from functools import partial from os import PathLike from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -71,6 +70,7 @@ from OpenSSL import crypto from typing_extensions import Self +from ota_metadata.utils.cert_store import CACertChainStore, load_cert_in_pem from ota_proxy import OTAFileCacheControl from otaclient_common.common import urljoin_ensure_base from otaclient_common.downloader import Downloader @@ -245,8 +245,8 @@ class _MetadataJWTParser: HASH_ALG = "sha256" - def __init__(self, metadata_jwt: str, *, certs_dir: Union[str, Path]): - self.cert_dir = Path(certs_dir) + def __init__(self, metadata_jwt: str, *, ca_chains_store: CACertChainStore): + self.ca_chains_store = ca_chains_store # pre_parse metadata_jwt jwt_list = metadata_jwt.split(".") @@ -267,46 +267,32 @@ def __init__(self, metadata_jwt: str, *, certs_dir: Union[str, Path]): self.ota_metadata = _MetadataJWTClaimsLayout.parse_payload(self.payload_bytes) logger.info(f"metadata={self.ota_metadata!r}") - def _verify_metadata_cert(self, metadata_cert: bytes) -> None: + def verify_metadata_cert(self, metadata_cert: bytes) -> None: """Verify the metadata's sign certificate against local pinned CA. Raises: Raise MetadataJWTVerificationFailed on verification failed. """ - ca_set_prefix = set() - # e.g. under _certs_dir: A.1.pem, A.2.pem, B.1.pem, B.2.pem - for cert in self.cert_dir.glob("*.*.pem"): - if m := re.match(r"(.*)\..*.pem", cert.name): - ca_set_prefix.add(m.group(1)) - else: - raise MetadataJWTVerificationFailed("no pem file is found") - if len(ca_set_prefix) == 0: - logger.warning("there is no root or intermediate certificate") - return - logger.info(f"certs prefixes {ca_set_prefix}") - - load_pem = partial(crypto.load_certificate, crypto.FILETYPE_PEM) + if not self.ca_chains_store: + _err_msg = "CA chains store is empty!!! immediately fail the verification" + logger.error(_err_msg) + raise MetadataJWTVerificationFailed(_err_msg) try: - cert_to_verify = load_pem(metadata_cert) + cert_to_verify = load_cert_in_pem(metadata_cert) except crypto.Error as e: _err_msg = f"invalid certificate {metadata_cert}: {e!r}" logger.exception(_err_msg) raise MetadataJWTVerificationFailed(_err_msg) from e - for ca_prefix in sorted(ca_set_prefix): - certs_list = [ - self.cert_dir / c.name for c in self.cert_dir.glob(f"{ca_prefix}.*.pem") - ] - - store = crypto.X509Store() - for c in certs_list: - logger.info(f"cert {c}") - store.add_cert(load_pem(c.read_bytes())) - + # verify the OTA image cert against CA cert chain store + for ca_prefix, ca_chain in self.ca_chains_store.items(): try: - store_ctx = crypto.X509StoreContext(store, cert_to_verify) - store_ctx.verify_certificate() + crypto.X509StoreContext( + store=ca_chain, + certificate=cert_to_verify, + ).verify_certificate() + logger.info(f"verfication succeeded against: {ca_prefix}") return except crypto.X509StoreContextError as e: @@ -316,7 +302,7 @@ def _verify_metadata_cert(self, metadata_cert: bytes) -> None: logger.error(_err_msg) raise MetadataJWTVerificationFailed(_err_msg) - def _verify_metadata(self, metadata_cert: bytes): + def verify_metadata_signature(self, metadata_cert: bytes): """Verify metadata against sign certificate. Raises: @@ -344,17 +330,6 @@ def get_otametadata(self) -> "_MetadataJWTClaimsLayout": """ return self.ota_metadata - def verify_metadata(self, metadata_cert: bytes): - """Verify metadata_jwt against metadata cert and local pinned CA certs. - - Raises: - Raise MetadataJWTVerificationFailed on verification failed. - """ - # step1: verify the cert itself against local pinned CA cert - self._verify_metadata_cert(metadata_cert) - # step2: verify the metadata against input metadata_cert - self._verify_metadata(metadata_cert) - # place holder for unset must field in _MetadataJWTClaimsLayout _MUST_SET_CLAIM = object() @@ -615,13 +590,18 @@ def __init__( url_base: str, downloader: Downloader, run_dir: Path, - certs_dir: Path, + ca_chains_store: CACertChainStore, retry_interval: int = 1, ) -> None: + if not ca_chains_store: + _err_msg = "CA chains store is empty!!! immediately fail the verification" + logger.error(_err_msg) + raise MetadataJWTVerificationFailed(_err_msg) + self.url_base = url_base self._downloader = downloader self.run_dir = run_dir - self.certs_dir = certs_dir + self.ca_chains_store = ca_chains_store self.retry_interval = retry_interval self._tmp_dir = TemporaryDirectory(prefix="ota_metadata", dir=run_dir) self._tmp_dir_path = Path(self._tmp_dir.name) @@ -673,7 +653,8 @@ def _process_metadata_jwt(self) -> _MetadataJWTClaimsLayout: time.sleep(self.retry_interval) _parser = _MetadataJWTParser( - _downloaded_meta_f.read_text(), certs_dir=self.certs_dir + _downloaded_meta_f.read_text(), + ca_chains_store=self.ca_chains_store, ) # get not yet verified parsed ota_metadata _ota_metadata = _parser.get_otametadata() @@ -700,7 +681,11 @@ def _process_metadata_jwt(self) -> _MetadataJWTClaimsLayout: except Exception as e: logger.warning(f"failed to download {cert_info}, retrying: {e!r}") time.sleep(self.retry_interval) - _parser.verify_metadata(cert_file.read_bytes()) + + cert_bytes = cert_file.read_bytes() + + _parser.verify_metadata_cert(cert_bytes) + _parser.verify_metadata_signature(cert_bytes) # return verified ota metadata return _ota_metadata diff --git a/src/ota_metadata/utils/__init__.py b/src/ota_metadata/utils/__init__.py new file mode 100644 index 000000000..bcfd866ad --- /dev/null +++ b/src/ota_metadata/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/ota_metadata/utils/cert_store.py b/src/ota_metadata/utils/cert_store.py new file mode 100644 index 000000000..ca33ea8a9 --- /dev/null +++ b/src/ota_metadata/utils/cert_store.py @@ -0,0 +1,93 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of otaclient CA cert chain store.""" + + +from __future__ import annotations + +import logging +import re +from functools import partial +from pathlib import Path +from typing import Dict + +from OpenSSL import crypto + +from otaclient_common.typing import StrOrPath + +logger = logging.getLogger(__name__) + +# we search the CA chains with the following naming schema: +# ..pem, +# in which should match regex pattern `[\w\-]+` +# e.g: +# dev chain: dev.intermediate.pem, dev.root.pem +# prd chain: prd.intermediate.pem, prd.intermediate.pem +CERT_NAME_PA = re.compile(r"(?P[\w\-]+)\.[\w\-]+\.pem") + + +class CACertStoreInvalid(Exception): ... + + +class CACertChainStore(Dict[str, crypto.X509Store]): + """A dict-like type that stores CA chain name and CA store mapping.""" + + +load_cert_in_pem = partial(crypto.load_certificate, crypto.FILETYPE_PEM) + + +def load_ca_cert_chains(cert_dir: StrOrPath) -> CACertChainStore: + """Load CA cert chains from . + + Raises: + CACertStoreInvalid on failed import. + + Returns: + A dict + """ + cert_dir = Path(cert_dir) + + ca_set_prefix = set() + for cert in cert_dir.glob("*.pem"): + if m := CERT_NAME_PA.match(cert.name): + ca_set_prefix.add(m.group("chain")) + else: + _err_msg = f"detect invalid named certs: {cert.name}" + logger.warning(_err_msg) + + if not ca_set_prefix: + _err_msg = f"no CA cert chains found to be installed under the {cert_dir}!!!" + logger.error(_err_msg) + raise CACertStoreInvalid(_err_msg) + + logger.info(f"found installed CA chains: {ca_set_prefix}") + + ca_chains = CACertChainStore() + for ca_prefix in sorted(ca_set_prefix): + try: + ca_chain = crypto.X509Store() + for c in cert_dir.glob(f"{ca_prefix}.*.pem"): + ca_chain.add_cert(load_cert_in_pem(c.read_bytes())) + ca_chains[ca_prefix] = ca_chain + except Exception as e: + _err_msg = f"failed to load CA chain {ca_prefix}: {e!r}" + logger.warning(_err_msg) + + if not ca_chains: + _err_msg = "all found CA chains are invalid, no CA chain is imported!!!" + logger.error(_err_msg) + raise CACertStoreInvalid(_err_msg) + + logger.info(f"loaded CA cert chains: {list(ca_chains)}") + return ca_chains diff --git a/src/otaclient/app/ota_client.py b/src/otaclient/app/ota_client.py index ca3af8ce5..51e0c256c 100644 --- a/src/otaclient/app/ota_client.py +++ b/src/otaclient/app/ota_client.py @@ -36,6 +36,11 @@ from ota_metadata.legacy import parser as ota_metadata_parser from ota_metadata.legacy import types as ota_metadata_types +from ota_metadata.utils.cert_store import ( + CACertChainStore, + CACertStoreInvalid, + load_ca_cert_chains, +) from otaclient import __version__ from otaclient.boot_control import BootControllerProtocol, get_boot_controller from otaclient.create_standby import ( @@ -172,12 +177,15 @@ def __init__( version: str, raw_url_base: str, cookies_json: str, + ca_chains_store: CACertChainStore, upper_otaproxy: str | None = None, boot_controller: BootControllerProtocol, create_standby_cls: Type[StandbySlotCreatorProtocol], control_flags: OTAClientControlFlags, status_query_interval: int = DEFAULT_STATUS_QUERY_INTERVAL, ) -> None: + self.ca_chains_store = ca_chains_store + self._shutdown = False self._update_status = api_types.UpdateStatus() self._last_status_query_timestamp = 0 @@ -399,7 +407,7 @@ def _execute_update(self): url_base=self.url_base, downloader=self._downloader_pool.get_instance(), run_dir=Path(cfg.RUN_DIR), - certs_dir=Path(cfg.CERTS_DIR), + ca_chains_store=self.ca_chains_store, ) self.total_files_num = otameta.total_files_num self.total_files_size_uncompressed = otameta.total_files_size_uncompressed @@ -609,6 +617,15 @@ def __init__( self.last_failure_type = api_types.FailureType.NO_FAILURE self.last_failure_reason = "" self.last_failure_traceback = "" + + try: + self.ca_chains_store = load_ca_cert_chains(cfg.CERTS_DIR) + except CACertStoreInvalid as e: + _err_msg = f"failed to import ca_chains_store: {e!r}, OTA will NOT occur on no CA chains installed!!!" + logger.error(_err_msg) + + self.ca_chains_store = CACertChainStore() + except Exception as e: _err_msg = f"failed to start otaclient core: {e!r}" logger.error(_err_msg) @@ -633,10 +650,18 @@ def _on_failure(self, exc: ota_errors.OTAError, ota_status: api_types.StatusOta) def update(self, version: str, url_base: str, cookies_json: str) -> None: try: logger.info("[update] entering local update...") + + if not self.ca_chains_store: + raise ota_errors.MetadataJWTVerficationFailed( + "no CA chains are installed, reject any OTA update", + module=__name__, + ) + self._update_executor = _OTAUpdater( version=version, raw_url_base=url_base, cookies_json=cookies_json, + ca_chains_store=self.ca_chains_store, boot_controller=self.boot_controller, create_standby_cls=self.create_standby_cls, control_flags=self.control_flags, diff --git a/tests/conftest.py b/tests/conftest.py index 6b374311a..e8caba824 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -61,6 +61,7 @@ class TestConfiguration: # dummy ota-image setting OTA_IMAGE_DIR = "/ota-image" + CERTS_DIR = "/certs" OTA_IMAGE_SERVER_ADDR = "127.0.0.1" OTA_IMAGE_SERVER_PORT = 8080 OTA_IMAGE_URL = f"http://{OTA_IMAGE_SERVER_ADDR}:{OTA_IMAGE_SERVER_PORT}" diff --git a/tests/keys/gen_certs.sh b/tests/keys/gen_certs.sh index 0506527ad..fd1b6455a 100644 --- a/tests/keys/gen_certs.sh +++ b/tests/keys/gen_certs.sh @@ -4,12 +4,14 @@ set -eux +CA_CHAIN_PREFIX=${1:-test} + # Root CA: openssl ecparam -out root.key -name prime256v1 -genkey openssl req -new -x509 \ -days $((365 * 10 + 5)) \ -key root.key \ - -out root.pem \ + -out ${CA_CHAIN_PREFIX}.root.pem \ -sha256 \ -subj "/C=JP/ST=Tokyo/O=Tier4/CN=root.tier4.jp" @@ -30,14 +32,23 @@ basicConstraints = critical, CA:true, pathlen:0 openssl x509 -req \ -days $((365 * 10 + 5)) \ -in interm.csr \ - -CA root.pem \ + -CA ${CA_CHAIN_PREFIX}.root.pem \ -CAkey root.key \ - -out interm.pem \ + -out ${CA_CHAIN_PREFIX}.interm.pem \ -sha256 -CAcreateserial \ -extfile <(echo "${CA_INTERM_EXT}") \ -extensions v3_intermediate_ca -# End user +# Sign cert +SIGN_CERT_EXT=" +[ sign_cert ] +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid:always,issuer +keyUsage = critical, digitalSignature +extendedKeyUsage = codeSigning +basicConstraints = critical, CA:FALSE +" + openssl ecparam -out sign.key -name prime256v1 -genkey openssl req -new \ -key sign.key \ @@ -48,9 +59,11 @@ openssl req -new \ openssl x509 -req \ -days 365 \ -in sign.csr \ - -CA interm.pem \ + -CA ${CA_CHAIN_PREFIX}.interm.pem \ -CAkey interm.key \ -out sign.pem \ - -sha256 -CAcreateserial + -sha256 -CAcreateserial \ + -extfile <(echo "${SIGN_CERT_EXT}") \ + -extensions sign_cert rm -f root.key interm.key interm.csr sign.csr *.srl diff --git a/tests/test_ota_metadata/test_ca_store.py b/tests/test_ota_metadata/test_ca_store.py new file mode 100644 index 000000000..104ad9977 --- /dev/null +++ b/tests/test_ota_metadata/test_ca_store.py @@ -0,0 +1,95 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path + +import pytest +from OpenSSL import crypto + +from ota_metadata.utils.cert_store import ( + CACertStoreInvalid, + load_ca_cert_chains, + load_cert_in_pem, +) +from tests.conftest import TEST_DIR +from tests.conftest import TestConfiguration as cfg + +GEN_CERTS_SCRIPT = TEST_DIR / "keys" / "gen_certs.sh" +TEST_BASE_SIGN_PEM = Path(cfg.CERTS_DIR) / "sign.pem" + + +@pytest.fixture +def setup_ca_chain(tmp_path: Path) -> tuple[str, Path, Path]: + """Create the certs dir and generate certs. + + Returns: + A tuple of chain prefix, sign cert path and certs dir. + """ + certs_dir = tmp_path / "certs" + certs_dir.mkdir(parents=True, exist_ok=True) + + shutil.copy(GEN_CERTS_SCRIPT, certs_dir) + gen_certs_script = certs_dir / GEN_CERTS_SCRIPT.name + + chain = "test_chain" + subprocess.run( + [ + "bash", + str(gen_certs_script), + chain, + ], + cwd=certs_dir, + ) + return chain, certs_dir / "sign.pem", certs_dir + + +def test_ca_store(setup_ca_chain: tuple[str, Path, Path]): + ca_chain, sign_pem, certs_dir = setup_ca_chain + + ca_store = load_ca_cert_chains(certs_dir) + + # verification should fail with wrong cert signed by other chain. + with pytest.raises(crypto.X509StoreContextError): + crypto.X509StoreContext( + store=ca_store[ca_chain], + certificate=load_cert_in_pem(TEST_BASE_SIGN_PEM.read_bytes()), + ).verify_certificate() + + # verification should succeed with proper chain and corresponding sign cert. + crypto.X509StoreContext( + store=ca_store[ca_chain], + certificate=load_cert_in_pem(sign_pem.read_bytes()), + ).verify_certificate() + + +def test_ca_store_empty(tmp_path: Path): + with pytest.raises(CACertStoreInvalid): + load_ca_cert_chains(tmp_path) + + +def test_ca_store_invalid(tmp_path: Path): + # create invalid certs under tmp_path + root_cert = tmp_path / "test.root.pem" + intermediate = tmp_path / "test.intermediate.pem" + + root_cert.write_text("abcdef") + intermediate.write_text("123456") + + with pytest.raises(CACertStoreInvalid): + load_ca_cert_chains(tmp_path) diff --git a/tests/test_ota_metadata/test_legacy.py b/tests/test_ota_metadata/test_legacy.py index 74e550a8f..2f730504d 100644 --- a/tests/test_ota_metadata/test_legacy.py +++ b/tests/test_ota_metadata/test_legacy.py @@ -36,6 +36,7 @@ parse_regulars_from_txt, parse_symlinks_from_txt, ) +from ota_metadata.utils.cert_store import load_ca_cert_chains from tests.conftest import TEST_DIR GEN_CERTS_SCRIPT = TEST_DIR / "keys" / "gen_certs.sh" @@ -190,17 +191,13 @@ def certs_dirs(tmp_path: Path) -> CertsDirs: for chain in ["chain_a", "chain_b"]: chain_dir = certs_dir / chain chain_dir.mkdir() - chain_interm_cert = chain_dir / f"{chain}.intermediate.pem" + chain_interm_cert = chain_dir / f"{chain}.interm.pem" chain_root_cert = chain_dir / f"{chain}.root.pem" subprocess.run( - [str(gen_certs_script)], + [str(gen_certs_script), chain], cwd=chain_dir, ) - interm_cert = chain_dir / "interm.pem" - root_cert = chain_dir / "root.pem" - shutil.move(str(interm_cert), chain_interm_cert) - shutil.move(str(root_cert), chain_root_cert) shutil.copy(chain_interm_cert, multi_chain_dir) shutil.copy(chain_root_cert, multi_chain_dir) @@ -220,8 +217,10 @@ def test_ota_metadata(payload_str: str, certs_dirs: CertsDirs): sign_pem = chain_b / "sign.pem" sign_key = chain_b / "sign.key" + ca_store = load_ca_cert_chains(certs_dir) + metadata_jwt = generate_jwt(payload_str, sign_key) - parser = _MetadataJWTParser(metadata_jwt, certs_dir=str(certs_dir)) + parser = _MetadataJWTParser(metadata_jwt, ca_chains_store=ca_store) metadata = parser.get_otametadata() assert asdict(metadata.directory) == DIR_INFO assert asdict(metadata.symboliclink) == SYMLINK_INFO @@ -230,7 +229,8 @@ def test_ota_metadata(payload_str: str, certs_dirs: CertsDirs): assert asdict(metadata.certificate) == CERTIFICATE_INFO assert metadata.rootfs_directory == ROOTFS_DIR_INFO - parser.verify_metadata(sign_pem.read_bytes()) + parser.verify_metadata_cert(sign_pem.read_bytes()) + parser.verify_metadata_signature(sign_pem.read_bytes()) if "total_regular_size" in payload_str: assert metadata.total_regular_size == TOTAL_REGULAR_SIZE else: @@ -251,11 +251,13 @@ def test_ota_metadata_with_verify_certificate_exception( chain_a, chain_b = certs_dirs["chain_a"], certs_dirs["chain_b"] chain_a_sign_key = chain_a / "sign.key" + ca_store = load_ca_cert_chains(chain_b) metadata_jwt = generate_jwt(payload_str, chain_a_sign_key) # use chain_b to verify chain_a's sign cert with pytest.raises(MetadataJWTVerificationFailed): - parser = _MetadataJWTParser(metadata_jwt, certs_dir=str(chain_b)) - parser.verify_metadata((chain_a / "sign.pem").read_bytes()) + parser = _MetadataJWTParser(metadata_jwt, ca_chains_store=ca_store) + parser.verify_metadata_cert((chain_a / "sign.pem").read_bytes()) + parser.verify_metadata_signature((chain_a / "sign.pem").read_bytes()) @pytest.mark.parametrize( @@ -270,10 +272,12 @@ def test_invalid_metadata_jwt(payload_str: str, certs_dirs: CertsDirs): sign_pem = certs_dir / "sign.pem" sign_key = certs_dir / "sign.key" + ca_store = load_ca_cert_chains(certs_dir) with pytest.raises(MetadataJWTPayloadInvalid): metadata_jwt = generate_jwt(payload_str, sign_key) - parser = _MetadataJWTParser(metadata_jwt, certs_dir=str(certs_dir)) - parser.verify_metadata(sign_pem.read_bytes()) + parser = _MetadataJWTParser(metadata_jwt, ca_chains_store=ca_store) + parser.verify_metadata_cert(sign_pem.read_bytes()) + parser.verify_metadata_signature(sign_pem.read_bytes()) # ------ text based ota metafiles parsing test ------ # @@ -385,7 +389,7 @@ def test_invalid_metadata_jwt(payload_str: str, certs_dirs: CertsDirs): ), ), ) -def test_RegularInf( +def test_regulars_txt( _input: str, mode: int, uid: int, @@ -421,7 +425,7 @@ def test_RegularInf( ), ), ) -def test_DirectoryInf(_input: str, mode: int, uid: int, gid: int, path: str): +def test_dirs_txt(_input: str, mode: int, uid: int, gid: int, path: str): entry = parse_dirs_from_txt(_input) assert entry.mode == mode @@ -444,7 +448,7 @@ def test_DirectoryInf(_input: str, mode: int, uid: int, gid: int, path: str): ), ), ) -def test_SymbolicLinkInf( +def test_symlinks_txt( _input: str, mode: int, uid: int, gid: int, link: str, target: str ): entry = parse_symlinks_from_txt(_input) @@ -465,6 +469,6 @@ def test_SymbolicLinkInf( ), ), ) -def test_PersistentInf(_input: str, path: str): +def test_persistent_txt(_input: str, path: str): entry = parse_persistents_from_txt(_input) assert entry.path == path diff --git a/tests/test_otaclient/test_create_standby.py b/tests/test_otaclient/test_create_standby.py index e624159fd..a939457f9 100644 --- a/tests/test_otaclient/test_create_standby.py +++ b/tests/test_otaclient/test_create_standby.py @@ -22,6 +22,7 @@ import pytest from pytest_mock import MockerFixture +from ota_metadata.utils.cert_store import load_ca_cert_chains from otaclient import create_standby from otaclient.app.configs import BaseConfig from otaclient.app.configs import config as otaclient_cfg @@ -36,7 +37,7 @@ MODULE = create_standby.__name__ -class Test_OTAupdate_with_create_standby_RebuildMode: +class TestOTAupdateWithCreateStandbyRebuildMode: """ NOTE: the boot_control is mocked, only testing create_standby and the logics directly implemented by OTAUpdater @@ -74,7 +75,7 @@ def mock_setup(self, mocker: MockerFixture, prepare_ab_slots): self._boot_control = typing.cast( BootControllerProtocol, mocker.MagicMock(spec=BootControllerProtocol) ) - self._boot_control.get_standby_boot_dir.return_value = self.slot_b_boot_dir + self._boot_control.get_standby_boot_dir.return_value = self.slot_b_boot_dir # type: ignore # ------ mock otaclient cfg ------ # _cfg = BaseConfig() @@ -84,15 +85,19 @@ def mock_setup(self, mocker: MockerFixture, prepare_ab_slots): mocker.patch(f"{cfg.OTACLIENT_MODULE_PATH}.cfg", _cfg) mocker.patch(f"{MODULE}.rebuild_mode.cfg", _cfg) - def test_update_with_create_standby_RebuildMode(self, mocker: MockerFixture): + def test_update_with_rebuild_mode(self, mocker: MockerFixture): # ------ execution ------ # otaclient_control_flags = typing.cast( OTAClientControlFlags, mocker.MagicMock(spec=OTAClientControlFlags) ) + + ca_store = load_ca_cert_chains(cfg.CERTS_DIR) + _updater = _OTAUpdater( version=cfg.UPDATE_VERSION, raw_url_base=cfg.OTA_IMAGE_URL, cookies_json=r'{"test": "my-cookie"}', + ca_chains_store=ca_store, upper_otaproxy=None, boot_controller=self._boot_control, create_standby_cls=RebuildMode, @@ -111,7 +116,7 @@ def test_update_with_create_standby_RebuildMode(self, mocker: MockerFixture): persist_handler.assert_called_once() # --- assert update finished _updater.shutdown.assert_called_once() - otaclient_control_flags.wait_can_reboot_flag.assert_called_once() + otaclient_control_flags.wait_can_reboot_flag.assert_called_once() # type: ignore # --- ensure the update stats are collected collector = _updater._update_stats_collector assert collector.processed_files_num diff --git a/tests/test_otaclient/test_ota_client.py b/tests/test_otaclient/test_ota_client.py index 52bfe4ae9..4cebecb8d 100644 --- a/tests/test_otaclient/test_ota_client.py +++ b/tests/test_otaclient/test_ota_client.py @@ -29,6 +29,7 @@ from ota_metadata.legacy.parser import parse_dirs_from_txt, parse_regulars_from_txt from ota_metadata.legacy.types import DirectoryInf, RegularInf +from ota_metadata.utils.cert_store import load_ca_cert_chains from otaclient.app.configs import config as otaclient_cfg from otaclient.app.errors import OTAErrorRecoverable from otaclient.app.ota_client import ( @@ -47,7 +48,19 @@ from tests.utils import SlotMeta -class Test_OTAUpdater: +@pytest.fixture(autouse=True, scope="module") +def mock_certs_dir(module_mocker: pytest_mock.MockerFixture): + """Mock to use the certs from the OTA test base image.""" + from otaclient.app.ota_client import cfg as _cfg + + module_mocker.patch.object( + _cfg, + "CERTS_DIR", + cfg.CERTS_DIR, + ) + + +class TestOTAUpdater: """ NOTE: the boot_control and create_standby are mocked, only testing the logics directly implemented by OTAUpdater @@ -156,17 +169,20 @@ def mock_setup(self, mocker: pytest_mock.MockerFixture, _delta_generate): f"{cfg.OTACLIENT_MODULE_PATH}.OTAUpdateStatsCollector", mocker.MagicMock() ) - def test_OTAUpdater(self, mocker: pytest_mock.MockerFixture): + def test_otaupdater(self, mocker: pytest_mock.MockerFixture): from otaclient.app.ota_client import OTAClientControlFlags, _OTAUpdater # ------ execution ------ # otaclient_control_flags = typing.cast( OTAClientControlFlags, mocker.MagicMock(spec=OTAClientControlFlags) ) + ca_store = load_ca_cert_chains(cfg.CERTS_DIR) + _updater = _OTAUpdater( version=cfg.UPDATE_VERSION, raw_url_base=cfg.OTA_IMAGE_URL, cookies_json=r'{"test": "my-cookie"}', + ca_chains_store=ca_store, boot_controller=self._boot_control, upper_otaproxy=None, create_standby_cls=self._create_standby_cls, @@ -194,7 +210,7 @@ def test_OTAUpdater(self, mocker: pytest_mock.MockerFixture): process_persists_handler.assert_called_once() -class Test_OTAClient: +class TestOTAClient: """Testing on OTAClient workflow.""" OTACLIENT_VERSION = "otaclient_version"