Skip to content

Commit

Permalink
Add pretty_index for printing index and subindex
Browse files Browse the repository at this point in the history
* Change print statements to use pretty_index()
* Updates from code review
  • Loading branch information
sveinse committed May 27, 2024
1 parent 0b2838f commit 7129fed
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 27 deletions.
18 changes: 9 additions & 9 deletions canopen/objectdictionary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from canopen.objectdictionary.datatypes import *
from canopen.objectdictionary.datatypes_24bit import Integer24, Unsigned24
from canopen.utils import pretty_index

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,8 +109,7 @@ def __getitem__(
if isinstance(index, str) and '.' in index:
idx, sub = index.split('.', maxsplit=1)
return self[idx][sub]
name = f"0x{index:04X}" if isinstance(index, int) else f"{index!r}"
raise KeyError(f"{name} was not found in Object Dictionary")
raise KeyError(f"{pretty_index(index)} was not found in Object Dictionary")
return item

def __setitem__(
Expand Down Expand Up @@ -180,12 +180,12 @@ def __init__(self, name: str, index: int):
self.names = {}

def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}>"
return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>"

def __getitem__(self, subindex: Union[int, str]) -> "ODVariable":
item = self.names.get(subindex) or self.subindices.get(subindex)
if item is None:
raise KeyError(f"Subindex {subindex!r} was not found")
raise KeyError(f"Subindex {pretty_index(None, subindex)} was not found")
return item

def __setitem__(self, subindex: Union[int, str], var: "ODVariable"):
Expand Down Expand Up @@ -239,7 +239,7 @@ def __init__(self, name: str, index: int):
self.names = {}

def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}>"
return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>"

def __getitem__(self, subindex: Union[int, str]) -> "ODVariable":
var = self.names.get(subindex) or self.subindices.get(subindex)
Expand All @@ -258,7 +258,7 @@ def __getitem__(self, subindex: Union[int, str]) -> "ODVariable":
if attr in template.__dict__:
var.__dict__[attr] = template.__dict__[attr]
else:
raise KeyError(f"Could not find subindex {subindex!r}")
raise KeyError(f"Could not find subindex {pretty_index(None, subindex)}")
return var

def __len__(self) -> int:
Expand Down Expand Up @@ -337,8 +337,8 @@ def __init__(self, name: str, index: int, subindex: int = 0):
self.pdo_mappable = False

def __repr__(self) -> str:
suffix = f":{self.subindex:02X}" if isinstance(self.parent, (ODRecord, ODArray)) else ""
return f"<{type(self).__qualname__} {self.qualname!r} at 0x{self.index:04X}{suffix}>"
subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None
return f"<{type(self).__qualname__} {self.qualname!r} at {pretty_index(self.index, subindex)}>"

@property
def qualname(self) -> str:
Expand Down Expand Up @@ -426,7 +426,7 @@ def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes:
raise ObjectDictionaryError("Data type has not been specified")
else:
raise TypeError(
f"Do not know how to encode {value!r} to data type {self.data_type:X}h")
f"Do not know how to encode {value!r} to data type 0x{self.data_type:X}")

def decode_phys(self, value: int) -> Union[int, bool, float, str, bytes]:
if self.data_type in INTEGER_TYPES:
Expand Down
8 changes: 4 additions & 4 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def __getitem_by_index(self, value):
valid_values.append(var.index)
if var.index == value:
return var
raise KeyError(f'{value} not found in map. Valid entries are ' +
", ".join(str(v) for v in valid_values))
raise KeyError(f"{value} not found in map. Valid entries are "
f"{', '.join(str(v) for v in valid_values)}")

def __getitem_by_name(self, value):
valid_values = []
Expand All @@ -211,8 +211,8 @@ def __getitem_by_name(self, value):
valid_values.append(var.name)
if var.name == value:
return var
raise KeyError(f'{value} not found in map. Valid entries are ' +
', '.join(valid_values))
raise KeyError(f"{value} not found in map. Valid entries are "
f"{', '.join(valid_values)}")

def __getitem__(self, key: Union[int, str]) -> "PdoVariable":
var = None
Expand Down
5 changes: 3 additions & 2 deletions canopen/sdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from canopen import objectdictionary
from canopen.objectdictionary import ObjectDictionary
from canopen import variable
from canopen.utils import pretty_index


class CrcXmodem:
Expand Down Expand Up @@ -97,7 +98,7 @@ def __init__(self, sdo_node: SdoBase, od: ObjectDictionary):
self.od = od

def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.od.name!r} at 0x{self.od.index:04X}>"
return f"<{type(self).__qualname__} {self.od.name!r} at {pretty_index(self.od.index)}>"

def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable":
return SdoVariable(self.sdo_node, self.od[subindex])
Expand All @@ -119,7 +120,7 @@ def __init__(self, sdo_node: SdoBase, od: ObjectDictionary):
self.od = od

def __repr__(self) -> str:
return f"<{type(self).__qualname__} {self.od.name!r} at 0x{self.od.index:04X}>"
return f"<{type(self).__qualname__} {self.od.name!r} at {pretty_index(self.od.index)}>"

def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable":
return SdoVariable(self.sdo_node, self.od[subindex])
Expand Down
7 changes: 4 additions & 3 deletions canopen/sdo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from canopen.network import CanError
from canopen import objectdictionary
from canopen.sdo.base import SdoBase
from canopen.utils import pretty_index
from canopen.sdo.constants import *
from canopen.sdo.exceptions import *

Expand Down Expand Up @@ -258,7 +259,7 @@ def __init__(self, sdo_client, index, subindex=0):
# Check that the message is for us
if res_index != index or res_subindex != subindex:
raise SdoCommunicationError(
f"Node returned a value for 0x{res_index:04X}:{res_subindex:02X} instead, "
f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, "
"maybe there is another SDO client communicating "
"on the same SDO channel?")

Expand Down Expand Up @@ -492,7 +493,7 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True):
if res_index != index or res_subindex != subindex:
self._error = True
raise SdoCommunicationError(
f"Node returned a value for 0x{res_index:04X}:{res_subindex:02X} instead, "
f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, "
"maybe there is another SDO client communicating "
"on the same SDO channel?")
if res_command & BLOCK_SIZE_SPECIFIED:
Expand Down Expand Up @@ -661,7 +662,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, request_crc_support
if res_index != index or res_subindex != subindex:
self.sdo_client.abort()
raise SdoCommunicationError(
f"Node returned a value for 0x{res_index:04X}:{res_subindex:02X} instead, "
f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, "
"maybe there is another SDO client communicating "
"on the same SDO channel?")
self._blksize, = struct.unpack_from("B", response, 4)
Expand Down
22 changes: 22 additions & 0 deletions canopen/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Additional utility functions for canopen."""
from typing import Optional, Union


def pretty_index(index: Optional[Union[int, str]],
sub: Optional[Union[int, str]] = None):
"""Format an index and subindex as a string."""

index_str = ""
if isinstance(index, int):
index_str = f"0x{index:04X}"
elif index:
index_str = f"{index!r}"

sub_str = ""
if isinstance(sub, int):
# Need 0x prefix if index is not present
sub_str = f"{'0x' if not index_str else ''}{sub:02X}"
elif sub:
sub_str = f"{sub!r}"

return ":".join(s for s in (index_str, sub_str) if s)
9 changes: 5 additions & 4 deletions canopen/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Mapping

from canopen import objectdictionary
from canopen.utils import pretty_index

logger = logging.getLogger(__name__)

Expand All @@ -23,10 +24,10 @@ def __init__(self, od: objectdictionary.ODVariable):
self.subindex = od.subindex

def __repr__(self) -> str:
suffix = f":{self.subindex:02X}" if isinstance(self.od.parent,
subindex = self.subindex if isinstance(self.od.parent,
(objectdictionary.ODRecord, objectdictionary.ODArray)
) else ""
return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}{suffix}>"
) else None
return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index, subindex)}>"

def get_data(self) -> bytes:
raise NotImplementedError("Variable is not readable")
Expand Down Expand Up @@ -76,7 +77,7 @@ def raw(self) -> Union[int, bool, float, str, bytes]:
written as :class:`bytes`.
"""
value = self.od.decode_raw(self.data)
text = f"Value of {self.name!r} (0x{self.index:04X}:{self.subindex:02X}) is {value!r}"
text = f"Value of {self.name!r} ({pretty_index(self.index, self.subindex)}) is {value!r}"
if value in self.od.value_descriptions:
text += f" ({self.od.value_descriptions[value]})"
logger.debug(text)
Expand Down
11 changes: 6 additions & 5 deletions test/test_eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import unittest
import canopen
from canopen.objectdictionary.eds import _signed_int_from_hex
from canopen.utils import pretty_index

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')

Expand Down Expand Up @@ -236,15 +237,15 @@ def test_export_eds(self):

for evar, avar in zip(expected_vars, actual_vars):
self.assertEqual(getattr(avar, "data_type", None), getattr(evar, "data_type", None),
f" mismatch on 0x{evar.index:04X}:{evar.subindex:02X}")
f" mismatch on {pretty_index(evar.index, evar.subindex)}")
self.assertEqual(getattr(avar, "default_raw", None), getattr(evar, "default_raw", None),
f" mismatch on 0x{evar.index:04X}:{evar.subindex:02X}")
f" mismatch on {pretty_index(evar.index, evar.subindex)}")
self.assertEqual(getattr(avar, "min", None), getattr(evar, "min", None),
f" mismatch on 0x{evar.index:04X}:{evar.subindex:02X}")
f" mismatch on {pretty_index(evar.index, evar.subindex)}")
self.assertEqual(getattr(avar, "max", None), getattr(evar, "max", None),
f" mismatch on 0x{evar.index:04X}:{evar.subindex:02X}")
f" mismatch on {pretty_index(evar.index, evar.subindex)}")
if doctype == "dcf":
self.assertEqual(getattr(avar, "value", None), getattr(evar, "value", None),
f" mismatch on 0x{evar.index:04X}:{evar.subindex:02X}")
f" mismatch on {pretty_index(evar.index, evar.subindex)}")

self.assertEqual(self.od.comments, exported_od.comments)
20 changes: 20 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import unittest
from canopen.utils import pretty_index


class TestUtils(unittest.TestCase):

def test_pretty_index(self):
self.assertEqual(pretty_index(0x12ab), "0x12AB")
self.assertEqual(pretty_index(0x12ab, 0xcd), "0x12AB:CD")
self.assertEqual(pretty_index(0x12ab, ""), "0x12AB")
self.assertEqual(pretty_index("test"), "'test'")
self.assertEqual(pretty_index("test", 0xcd), "'test':CD")
self.assertEqual(pretty_index(None), "")
self.assertEqual(pretty_index(""), "")
self.assertEqual(pretty_index("", ""), "")
self.assertEqual(pretty_index(None, 0xab), "0xAB")


if __name__ == "__main__":
unittest.main()

0 comments on commit 7129fed

Please sign in to comment.