Skip to content

Commit

Permalink
feat: add otaclient_common.shm_status for sharing status between proc…
Browse files Browse the repository at this point in the history
…esses (#432)

This PR backports otaclient_common.shm_status module from the dev branch. This module achieves sharing status by pickled python object securely and efficiently with shared memory and hmac-sha512.

Other changes: 
1. tests: coverage: enable multiprocessing concurrency measurement.
2. tests: fix up test docker entrypoint as introducing multiprocessing measurement
  • Loading branch information
Bodong-Yang authored Nov 27, 2024
1 parent cd768c3 commit afc2f67
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/gen_requirements_txt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
# For more details about this restriction, please refer to:
# https://github.com/peter-evans/create-pull-request/issues/48 and
# https://github.com/peter-evans/create-pull-request/blob/main/docs/concepts-guidelines.md#triggering-further-workflow-runs
ssh_key: ${{ secrets.DEPLOY_KEY }}
ssh-key: ${{ secrets.DEPLOY_KEY }}
persist-credentials: true

- name: setup python
Expand Down
1 change: 1 addition & 0 deletions docker/test_base/entry_point.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ echo "execute test with coverage"
cd ${TEST_ROOT}
hatch env create dev
hatch run dev:coverage run -m pytest --junit-xml=${OUTPUT_DIR}/pytest.xml ${@:-}
hatch run dev:coverage combine
hatch run dev:coverage xml -o ${OUTPUT_DIR}/coverage.xml
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ extend-exclude = '''(
)'''

[tool.coverage.run]
concurrency = [
"multiprocessing",
"thread",
]
branch = false
relative_files = true
parallel = true
source = [
"otaclient",
"otaclient_api",
Expand Down
207 changes: 207 additions & 0 deletions src/otaclient_common/shm_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A lib for sharing status between processes.
shared memory layout:
rwlock(1byte) | hmac-sha512 of msg(64bytes) | msg_len(4bytes,big) | msg(<msg_len>bytes)
In which, msg is pickled python object.
"""


from __future__ import annotations

import hashlib
import hmac
import logging
import multiprocessing.shared_memory as mp_shm
import pickle
import time
from typing import Generic

from otaclient_common.typing import T

logger = logging.getLogger(__name__)

DEFAULT_HASH_ALG = "sha512"
DEFAULT_KEY_LEN = hashlib.new(DEFAULT_HASH_ALG).digest_size

RWLOCK_LEN = 1 # byte
PAYLOAD_LEN_BYTES = 4 # bytes

RWLOCK_LOCKED = b"\xab"
RWLOCK_OPEN = b"\x54"


class RWBusy(Exception): ...


class SHA512Verifier:
"""Base class for specifying hash alg related configurations."""

DIGEST_ALG = "sha512"
DIGEST_SIZE = hashlib.new(DIGEST_ALG).digest_size
MIN_ENCAP_MSG_LEN = RWLOCK_LEN + DIGEST_SIZE + PAYLOAD_LEN_BYTES

_key: bytes

def cal_hmac(self, _raw_msg: bytes) -> bytes:
return hmac.digest(key=self._key, msg=_raw_msg, digest=self.DIGEST_ALG)

def verify_msg(self, _raw_msg: bytes, _expected_hmac: bytes) -> bool:
return hmac.compare_digest(
hmac.digest(
key=self._key,
msg=_raw_msg,
digest=self.DIGEST_ALG,
),
_expected_hmac,
)


def _ensure_connect_shm(
name: str, *, max_retry: int, retry_interval: int
) -> mp_shm.SharedMemory:
for _idx in range(max_retry):
try:
return mp_shm.SharedMemory(name=name, create=False)
except Exception as e:
logger.warning(
f"retry #{_idx}: failed to connect to {name=}: {e!r}, keep retrying ..."
)
time.sleep(retry_interval)
raise ValueError(f"failed to connect share memory with {name=}")


class MPSharedStatusReader(SHA512Verifier, Generic[T]):

def __init__(
self,
*,
name: str,
key: bytes,
max_retry: int = 6,
retry_interval: int = 1,
) -> None:
self._shm = shm = _ensure_connect_shm(
name, max_retry=max_retry, retry_interval=retry_interval
)
self.mem_size = size = shm.size
self.msg_max_size = size - self.MIN_ENCAP_MSG_LEN
self._key = key

def atexit(self) -> None:
self._shm.close()

def sync_msg(self) -> T:
"""Get msg from shared memory.
Raises:
RWBusy if rwlock indicates the writer is writing or not yet ready.
ValueError for invalid msg.
"""
buffer = self._shm.buf

# check if we can read
_cursor = 0
rwlock = bytes(buffer[_cursor:RWLOCK_LEN])
if rwlock != RWLOCK_OPEN:
if rwlock == RWLOCK_LOCKED:
raise RWBusy("write in progress, abort")
raise RWBusy("no msg has been written yet")
_cursor += RWLOCK_LEN

# parsing the msg
input_hmac = bytes(buffer[_cursor : _cursor + self.DIGEST_SIZE])
_cursor += self.DIGEST_SIZE

_payload_len_bytes = bytes(buffer[_cursor : _cursor + PAYLOAD_LEN_BYTES])
payload_len = int.from_bytes(_payload_len_bytes, "big", signed=False)
_cursor += PAYLOAD_LEN_BYTES

if payload_len > self.msg_max_size:
raise ValueError(f"invalid msg: {payload_len=} > {self.msg_max_size}")

payload = bytes(buffer[_cursor : _cursor + payload_len])
if self.verify_msg(payload, input_hmac):
return pickle.loads(payload)
raise ValueError("failed to validate input msg")


class MPSharedStatusWriter(SHA512Verifier, Generic[T]):

def __init__(
self,
*,
name: str | None = None,
size: int = 0,
key: bytes,
create: bool = False,
msg_max_size: int | None = None,
max_retry: int = 6,
retry_interval: int = 1,
) -> None:
if create:
_msg_max_size = size - self.MIN_ENCAP_MSG_LEN
if _msg_max_size < 0:
raise ValueError(f"{size=} < {self.MIN_ENCAP_MSG_LEN=}")
self._shm = shm = mp_shm.SharedMemory(name=name, size=size, create=True)
self.mem_size = shm.size

elif name:
self._shm = shm = _ensure_connect_shm(
name, max_retry=max_retry, retry_interval=retry_interval
)
self.mem_size = size = shm.size
_msg_max_size = size - self.MIN_ENCAP_MSG_LEN
if _msg_max_size < 0:
shm.close()
raise ValueError(f"{size=} < {self.MIN_ENCAP_MSG_LEN=}")
else:
raise ValueError("<name> must be specified if <create> is False")

self.name = shm.name
self._key = key
self.msg_max_size = min(_msg_max_size, msg_max_size or float("infinity"))

def atexit(self) -> None:
self._shm.close()

def write_msg(self, obj: T) -> None:
"""Write msg to shared memory.
Raises:
ValueError on invalid msg or exceeding shared memory size.
"""
buffer = self._shm.buf
_pickled = pickle.dumps(obj)
_pickled_len = len(_pickled)

if _pickled_len > self.msg_max_size:
raise ValueError(f"exceed {self.msg_max_size=}: {_pickled_len=}")

msg = b"".join(
[
RWLOCK_LOCKED,
self.cal_hmac(_pickled),
_pickled_len.to_bytes(PAYLOAD_LEN_BYTES, "big", signed=False),
_pickled,
]
)
msg_len = len(msg)
if msg_len > self.mem_size:
raise ValueError(f"{msg_len=} > {self.mem_size=}")

buffer[:msg_len] = msg
buffer[:1] = RWLOCK_OPEN
Loading

1 comment on commit afc2f67

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
src/ota_metadata/legacy
   __init__.py10100% 
   parser.py3264386%100, 156, 161, 197–198, 208–209, 212, 224, 277–279, 283–286, 320–323, 392, 395, 403–405, 418, 427–428, 431–432, 597–599, 649–650, 653, 681–683, 737, 740–742
   types.py841384%37, 40–42, 112–116, 122–125
src/ota_metadata/utils
   cert_store.py420100% 
src/ota_proxy
   __init__.py15660%48, 50, 52, 61, 71–72
   __main__.py880%16, 18–20, 22, 24–25, 27
   _consts.py150100% 
   cache_control_header.py68494%71, 91, 113, 121
   cache_streaming.py1442284%154–156, 184–186, 211, 225, 229–230, 265–266, 268, 280, 349, 355–356, 359, 367–370
   config.py200100% 
   db.py731875%109, 115, 153, 159–160, 163, 169, 171, 192–199, 201–202
   errors.py50100% 
   external_cache.py282028%31, 35, 40–42, 44–45, 48–49, 51–53, 60, 63–65, 69–72
   lru_cache_helper.py47295%84–85
   ota_cache.py2216271%71–72, 147, 150–151, 163–164, 196–197, 214, 235, 254–258, 262–264, 266, 268–275, 277–279, 282–283, 287–288, 292, 339, 347–349, 428–431, 445, 448–449, 462–463, 465–467, 471–472, 478–479, 510, 516, 543, 595–597
   server_app.py1393971%76, 79, 85, 101, 103, 162, 171, 213–214, 216–218, 221, 226–228, 231–232, 235, 238, 241, 244, 257–258, 261–262, 264, 267, 293–296, 299, 313–315, 321–323
   utils.py140100% 
src/otaclient
   __init__.py5260%17, 19
   __main__.py110%16
   _logging.py513335%43–44, 46–47, 49–54, 56–57, 59–60, 62–65, 67, 77, 80–82, 84–86, 89–90, 92–96
   _status_monitor.py1611193%46, 48–49, 157, 160, 177, 180, 196–197, 204, 207
   _types.py750100% 
   errors.py120199%97
   main.py492940%30, 32–39, 41–42, 44–45, 47–48, 52, 54, 59, 61, 67–69, 72–73, 77–80, 82
   ota_core.py3009070%97, 124, 126–127, 129, 133, 137–138, 143–144, 150, 152, 191–194, 200, 204, 210, 349, 381–382, 384, 393, 396, 401–402, 405, 411, 413–417, 453–456, 459–466, 502–505, 583–590, 595, 598–605, 638–639, 645, 649–650, 656, 681–683, 685, 760, 788–789, 791–792, 800, 802–808
   utils.py37294%73–74
src/otaclient/boot_control
   __init__.py40100% 
   _firmware_package.py932276%82, 86, 136, 180, 186, 209–210, 213–218, 220–221, 224–229, 231
   _grub.py41812769%214, 262–265, 271–275, 312–313, 320–325, 328–334, 337, 340–341, 346, 348–350, 359–365, 367–368, 370–372, 381–383, 385–387, 466–467, 471–472, 524, 530, 556, 578, 582–583, 598–600, 624–627, 639, 643–645, 647–649, 708–711, 736–739, 762–765, 777–778, 781–782, 817, 823, 843–844, 846, 871–873, 891–894, 919–922, 929–932, 937–945, 950–957
   _jetson_cboot.py2612610%20, 22–25, 27–29, 35–40, 42, 58–60, 62, 64–65, 71, 75, 134, 137, 139–140, 143, 150–151, 159–160, 163, 169–170, 178, 187–191, 193, 199, 202–203, 209, 212–213, 218–219, 221, 227–228, 231–232, 235–237, 239, 245, 250–252, 254–256, 261, 263–266, 268–269, 278–279, 282–283, 288–289, 292–296, 299–300, 305–306, 309, 312–316, 321–324, 327, 330–331, 334, 337–338, 341, 345–350, 354–355, 359, 362–363, 366, 369–372, 374, 377–378, 382, 385, 388–391, 393, 400, 404–405, 408–409, 415–416, 422, 424–425, 429, 431, 433–435, 438, 442, 445, 448–449, 451, 454, 462–463, 470, 480, 483, 491–492, 497–500, 502, 509, 511–513, 519–520, 524–525, 528, 532, 535, 537, 544–548, 550, 562–565, 568, 571, 573, 580, 590–592, 594, 596, 599, 602, 605, 607–608, 611–615, 619–621, 623, 631–635, 637, 640, 644, 647, 658–659, 664, 674, 677–683, 687–693, 697–706, 710–718, 722, 724, 726–728
   _jetson_common.py1724573%132, 140, 288–291, 294, 311, 319, 354, 359–364, 382, 408–409, 411–413, 417–420, 422–423, 425–429, 431, 438–439, 442–443, 453, 456–457, 460, 462, 506–507
   _jetson_uefi.py40427432%124–126, 131–132, 151–153, 158–161, 328, 446, 448–451, 455, 459–460, 462–470, 472, 484–485, 488–489, 492–493, 496–498, 502–503, 508–510, 514, 518–519, 522–523, 526–527, 531, 534–535, 537, 542–543, 547, 550–551, 556, 560–561, 564, 568–570, 572, 576–579, 581–582, 604–605, 609–610, 612, 616, 620–621, 624–625, 632, 635–637, 640, 642–643, 648–649, 652–655, 657–658, 663, 665–666, 674, 677–680, 682–683, 685, 689–690, 694, 702–706, 709–710, 712, 715–719, 722, 725–729, 733–734, 737–742, 745–746, 749–752, 754–755, 762–763, 773–776, 779, 782–785, 788–792, 795–796, 799, 802–805, 808, 810, 815–816, 819, 822–825, 827, 833, 838–839, 858–859, 862, 870–871, 878, 888, 891, 898–899, 904–907, 915–918, 926–927, 939–942, 944, 947, 950, 958, 969–971, 973–975, 977–981, 986–987, 989, 1002, 1006, 1009, 1019, 1024, 1032–1033, 1036, 1040, 1042–1044, 1050–1051, 1056, 1064–1069, 1074–1079, 1084–1092, 1097–1104, 1112–1114
   _ota_status_control.py1021189%117, 122, 127, 240, 244–245, 248, 255, 257–258, 273
   _rpi_boot.py28713353%53, 56, 120–121, 125, 133–136, 150–153, 158–159, 161–162, 167–168, 171–172, 181–182, 222, 228–232, 235, 253–255, 259–261, 266–268, 272–274, 284–285, 288, 291, 293–294, 296–297, 299–301, 307, 310–311, 321–324, 332–336, 338, 340–341, 346–347, 354, 357–362, 393, 395–398, 408–411, 415–416, 418–422, 450–453, 472–475, 501–504, 509–517, 522–529, 544–547, 554–557, 565–567
   _slot_mnt_helper.py100100% 
   configs.py510100% 
   protocol.py60100% 
   selecter.py412929%44–46, 49–50, 54–55, 58–60, 63, 65, 69, 77–79, 81–82, 84–85, 89, 91, 93–94, 96, 98–99, 101, 103
src/otaclient/configs
   __init__.py170100% 
   _cfg_configurable.py470100% 
   _cfg_consts.py47197%97
   _common.py80100% 
   _ecu_info.py56394%59, 64–65
   _proxy_info.py51590%85, 87–88, 90, 101
   cfg.py190100% 
src/otaclient/create_standby
   __init__.py13192%36
   common.py2264480%59, 62–63, 67–69, 71, 75–76, 78, 126, 174–176, 178–180, 182, 185–188, 192, 203, 279–280, 282–287, 299, 339, 367, 370–372, 388–389, 403, 407, 429–430
   interface.py70100% 
   rebuild_mode.py1151091%98–100, 119, 150–155
src/otaclient/grpc
   _otaproxy_ctx.py644135%38, 40–41, 43–45, 47, 52–56, 58, 77–78, 80, 83, 87, 100–102, 106–107, 109, 111–112, 118–120, 124, 133–134, 136–141, 143–145
src/otaclient/grpc/api_v2
   ecu_status.py1531093%75, 78, 81, 147, 171, 173, 300, 370–371, 410
   ecu_tracker.py341944%40–42, 48, 52–54, 61–63, 66, 70–74, 77–79
   servicer.py1273671%82, 171–173, 180, 191–192, 233–234, 237–241, 250–259, 266, 272, 275–278, 284–285, 292, 295, 301, 304
   types.py46295%78–79
src/otaclient_api/v2
   api_caller.py39684%45–47, 83–85
   types.py2562391%86, 89–92, 131, 209–210, 212, 259, 262–263, 506–508, 512–513, 515, 518–519, 522–523, 586
src/otaclient_common
   __init__.py341555%42–44, 61, 63, 68–77
   _io.py64198%41
   cmdhelper.py130100% 
   common.py1061189%44, 148, 151–153, 168, 175–177, 271, 275
   downloader.py1991094%107–108, 126, 153, 369, 424, 428, 516–517, 526
   linux.py611575%51–53, 59, 69, 74, 76, 108–109, 133–134, 190, 195–196, 198
   logging.py29196%55
   persist_file_handling.py1181884%113, 118, 150–152, 163, 192–193, 228–232, 242–244, 246–247
   proto_streamer.py42880%33, 48, 66–67, 72, 81–82, 100
   proto_wrapper.py3984688%87, 165, 172, 184–186, 205, 210, 221, 257, 263, 268, 299, 303, 307, 402, 462, 469, 472, 492, 499, 501, 526, 532, 535, 537, 562, 568, 571, 573, 605, 609, 611, 625, 642, 669, 672, 676, 707, 713, 762–763, 765, 803–805
   retry_task_map.py105595%158–159, 161, 181–182
   shm_status.py952177%79–80, 83–84, 105, 120–122, 134, 139, 156–160, 169–170, 172, 179, 192, 204
   typing.py31487%48, 97–98, 100
TOTAL6518166474% 

Tests Skipped Failures Errors Time
236 0 💤 0 ❌ 0 🔥 11m 56s ⏱️

Please sign in to comment.