Skip to content

Commit

Permalink
mil_usb_to_can.sub9: Prevent Packet.from_bytes from returning anythin…
Browse files Browse the repository at this point in the history
…g but subclasses, improve documentation
  • Loading branch information
cbrxyz committed Sep 8, 2024
1 parent 78b5d23 commit a243432
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
16 changes: 16 additions & 0 deletions docs/reference/can.rst
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ byte length.
| Status) | | | |
+------------+--------------+----------------+-------------------------------------------------------------------------+

Exceptions
~~~~~~~~~~

Exception Hierarchy
"""""""""""""""""""
.. currentmodule:: mil_usb_to_can.sub9

.. exception_hierarchy::

- :exc:`ChecksumException`

Exception List
"""""""""""""""""""
.. autoclass:: mil_usb_to_can.sub9.ChecksumException
:members:

CANDeviceHandle
~~~~~~~~~~~~~~~
.. attributetable:: mil_usb_to_can.sub9.CANDeviceHandle
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .device import CANDeviceHandle, SimulatedCANDeviceHandle
from .packet import AckPacket, NackPacket, Packet
from .packet import AckPacket, ChecksumException, NackPacket, Packet
34 changes: 29 additions & 5 deletions mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import ClassVar, get_type_hints
from typing import ClassVar, TypeVar, get_type_hints

SYNC_CHAR_1 = 0x37
SYNC_CHAR_2 = 0x01

_packet_registry: dict[int, dict[int, type[Packet]]] = {}


PacketSelf = TypeVar("PacketSelf", bound="Packet")


def hexify(data: bytes) -> str:
return ":".join(f"{c:02x}" for c in data)

Expand All @@ -22,12 +25,22 @@ def get_cache_hints(cls):


class ChecksumException(OSError):
"""
An invalid checksum appeared.
"""

def __init__(
self,
packet: type[Packet],
received: tuple[int, int],
expected: tuple[int, int],
):
"""
Attributes:
packet (Type[:class:`~.Packet`]): The packet with the invalid checksum.
received (Tuple[int, int]): The received Fletcher's checksum.
expected (Tuple[int, int]): The expected Fletcher's checksum.
"""
super().__init__(
f"Invalid checksum in packet of type {packet.__qualname__}: received {received}, expected {expected}",
)
Expand Down Expand Up @@ -129,12 +142,18 @@ def __bytes__(self):
return data + struct.pack("<BB", *checksum)

@classmethod
def from_bytes(cls, packed: bytes) -> Packet:
def from_bytes(cls: type[PacketSelf], packed: bytes) -> PacketSelf:
"""
Constructs a packet from a packed packet in a :class:`bytes` object.
If a packet is found with the corresponding message and subclass ID,
then an instance of that packet class will be returned, else :class:`Packet`
will be returned.
then an instance (or subclass) of that packet class will be returned.
Raises:
ChecksumException: The checksum is invalid.
LookupError: No packet with the specified class and subclass IDs exist.
Returns:
An instance of the appropriate packet subclass.
"""
msg_id = packed[2]
subclass_id = packed[3]
Expand All @@ -150,7 +169,12 @@ def from_bytes(cls, packed: bytes) -> Packet:
cls._calculate_checksum(packed[2:-2]),
)
unpacked = struct.unpack(subclass.payload_format, payload)
return subclass(*unpacked)
packet = subclass(*unpacked)
if not isinstance(packet, cls):
raise RuntimeError(
f"Attempted to resolve packet of type {cls.__qualname__}, but found {packet.__class__.__qualname__} for bytes: {hexify(payload)}",
)
return packet
raise LookupError(
f"Attempted to reconstruct packet with msg_id 0x{msg_id:02x} and subclass_id 0x{subclass_id:02x}, but no packet with IDs was found.",
)
Expand Down
11 changes: 11 additions & 0 deletions mil_common/drivers/mil_usb_to_can/test/test_packets_sub9.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class TestPacket(Packet, msg_id=0x47, subclass_id=0x44, payload_format="?Hd"):
example_float: float


@dataclass
class TestPacketTwo(Packet, msg_id=0x47, subclass_id=0x45, payload_format=""):
pass


class BasicApplicationPacketTest(unittest.IsolatedAsyncioTestCase):
"""
Tests basic application packet functionality.
Expand Down Expand Up @@ -43,6 +48,12 @@ def test_format(self):
TestPacket.from_bytes(TestPacket.__bytes__(packet)),
packet,
)
self.assertEqual(
Packet.from_bytes(TestPacket.__bytes__(packet)),
packet,
)
with self.assertRaises(RuntimeError):
TestPacketTwo.from_bytes(bytes(packet))

def test_comparisons(self):
packet = TestPacket(False, 42, 3.14)
Expand Down

0 comments on commit a243432

Please sign in to comment.