diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index 56dbbee2..156972fd 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -9,7 +9,6 @@ import logging from canopen.objectdictionary.datatypes import * -from canopen.objectdictionary.datatypes_24bit import Integer24, Unsigned24 from canopen.utils import pretty_index logger = logging.getLogger(__name__) @@ -282,17 +281,25 @@ def add_member(self, variable: ODVariable) -> None: class ODVariable: """Simple variable.""" - STRUCT_TYPES = { + STRUCT_TYPES: dict[int, struct.Struct] = { + # Use struct module to pack/unpack data where possible and use the + # custom IntegerN and UnsignedN classes for the special data types. BOOLEAN: struct.Struct("?"), INTEGER8: struct.Struct("b"), INTEGER16: struct.Struct(" None: def decode_raw(self, data: bytes) -> Union[int, float, str, bytes, bytearray]: if self.data_type == VISIBLE_STRING: - return data.rstrip(b"\x00").decode("ascii", errors="ignore") + # Strip any trailing NUL characters from C-based systems + return data.decode("ascii", errors="ignore").rstrip("\x00") elif self.data_type == UNICODE_STRING: - # Is this correct? - return data.rstrip(b"\x00").decode("utf_16_le", errors="ignore") + # The CANopen standard does not specify the encoding. This + # library assumes UTF-16, being the most common two-byte encoding format. + # Strip any trailing NUL characters from C-based systems + return data.decode("utf_16_le", errors="ignore").rstrip("\x00") elif self.data_type in self.STRUCT_TYPES: try: value, = self.STRUCT_TYPES[self.data_type].unpack(data) @@ -407,8 +417,9 @@ def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes: elif self.data_type == VISIBLE_STRING: return value.encode("ascii") elif self.data_type == UNICODE_STRING: - # Is this correct? return value.encode("utf_16_le") + elif self.data_type in (DOMAIN, OCTET_STRING): + return bytes(value) elif self.data_type in self.STRUCT_TYPES: if self.data_type in INTEGER_TYPES: value = int(value) diff --git a/canopen/objectdictionary/datatypes.py b/canopen/objectdictionary/datatypes.py index fa849d5c..ad37fe6f 100644 --- a/canopen/objectdictionary/datatypes.py +++ b/canopen/objectdictionary/datatypes.py @@ -1,3 +1,4 @@ +import struct BOOLEAN = 0x1 INTEGER8 = 0x2 @@ -10,16 +11,116 @@ VISIBLE_STRING = 0x9 OCTET_STRING = 0xA UNICODE_STRING = 0xB +TIME_OF_DAY = 0xC +TIME_DIFFERENCE = 0xD DOMAIN = 0xF INTEGER24 = 0x10 REAL64 = 0x11 +INTEGER40 = 0x12 +INTEGER48 = 0x13 +INTEGER56 = 0x14 INTEGER64 = 0x15 UNSIGNED24 = 0x16 +UNSIGNED40 = 0x18 +UNSIGNED48 = 0x19 +UNSIGNED56 = 0x1A UNSIGNED64 = 0x1B +PDO_COMMUNICATION_PARAMETER = 0x20 +PDO_MAPPING = 0x21 +SDO_PARAMETER = 0x22 +IDENTITY = 0x23 -SIGNED_TYPES = (INTEGER8, INTEGER16, INTEGER24, INTEGER32, INTEGER64) -UNSIGNED_TYPES = (UNSIGNED8, UNSIGNED16, UNSIGNED24, UNSIGNED32, UNSIGNED64) +SIGNED_TYPES = ( + INTEGER8, + INTEGER16, + INTEGER24, + INTEGER32, + INTEGER40, + INTEGER48, + INTEGER56, + INTEGER64, +) +UNSIGNED_TYPES = ( + UNSIGNED8, + UNSIGNED16, + UNSIGNED24, + UNSIGNED32, + UNSIGNED40, + UNSIGNED48, + UNSIGNED56, + UNSIGNED64, +) INTEGER_TYPES = SIGNED_TYPES + UNSIGNED_TYPES FLOAT_TYPES = (REAL32, REAL64) NUMBER_TYPES = INTEGER_TYPES + FLOAT_TYPES DATA_TYPES = (VISIBLE_STRING, OCTET_STRING, UNICODE_STRING, DOMAIN) + + +class UnsignedN(struct.Struct): + """Packing and unpacking unsigned integers of arbitrary width, like struct.Struct. + + The width must be a multiple of 8 and must be between 8 and 64. + """ + + def __init__(self, width: int): + self.width = width + if width % 8 != 0: + raise ValueError("Width must be a multiple of 8") + if width <= 0 or width > 64: + raise ValueError("Invalid width for UnsignedN") + elif width <= 8: + fmt = "B" + elif width <= 16: + fmt = " int: + return self.width // 8 + + +class IntegerN(struct.Struct): + """Packing and unpacking integers of arbitrary width, like struct.Struct. + + The width must be a multiple of 8 and must be between 8 and 64. + """ + + def __init__(self, width: int): + self.width = width + if width % 8 != 0: + raise ValueError("Width must be a multiple of 8") + if width <= 0 or width > 64: + raise ValueError("Invalid width for IntegerN") + elif width <= 8: + fmt = "b" + elif width <= 16: + fmt = " 0 + return super().unpack( + buffer + (b'\xff' if neg else b'\x00') * (super().size - self.size) + ) + + def pack(self, *v): + return super().pack(*v)[:self.size] + + @property + def size(self) -> int: + return self.width // 8 diff --git a/canopen/objectdictionary/datatypes_24bit.py b/canopen/objectdictionary/datatypes_24bit.py deleted file mode 100644 index 475e7526..00000000 --- a/canopen/objectdictionary/datatypes_24bit.py +++ /dev/null @@ -1,33 +0,0 @@ -import struct - - -class Unsigned24: - def __init__(self): - self.__st = struct.Struct(" 0 - return self.__st.unpack(__buffer + (b'\xff' if neg else b'\x00')) - - def pack(self, *v): - return self.__st.pack(*v)[:3] - - @property - def size(self): - return 3 diff --git a/test/datatypes.eds b/test/datatypes.eds new file mode 100644 index 00000000..ff858cb1 --- /dev/null +++ b/test/datatypes.eds @@ -0,0 +1,310 @@ +[FileInfo] +FileName=datatypes.eds +FileVersion=1 +FileRevision=1 +EDSVersion=4.0 +Description=OD implementing the CANOpen datatype catalog +CreationTime=07:31PM +CreationDate=05-24-2024 +CreatedBy=objdictgen +ModificationTime=07:31PM +ModificationDate=05-24-2024 +ModifiedBy=objdictgen + +[DeviceInfo] +VendorName=objdictgen +VendorNumber=0x00000000 +ProductName=Alltypes +ProductNumber=0x00000000 +RevisionNumber=0x00000000 +BaudRate_10=1 +BaudRate_20=1 +BaudRate_50=1 +BaudRate_125=1 +BaudRate_250=1 +BaudRate_500=1 +BaudRate_800=1 +BaudRate_1000=1 +SimpleBootUpMaster=1 +SimpleBootUpSlave=0 +Granularity=8 +DynamicChannelsSupported=0 +CompactPDO=0 +GroupMessaging=0 +NrOfRXPDO=0 +NrOfTXPDO=0 +LSS_Supported=0 + +[DummyUsage] +Dummy0001=0 +Dummy0002=1 +Dummy0003=1 +Dummy0004=1 +Dummy0005=1 +Dummy0006=1 +Dummy0007=1 + +[Comments] +Lines=0 + +[MandatoryObjects] +SupportedObjects=1 +1=0x1018 + +[1018] +ParameterName=Identity +ObjectType=0x9 +SubNumber=5 + +[1018sub0] +ParameterName=Number of Entries +ObjectType=0x7 +DataType=0x0005 +AccessType=ro +DefaultValue=4 +PDOMapping=0 + +[1018sub1] +ParameterName=Vendor ID +ObjectType=0x7 +DataType=0x0007 +AccessType=ro +DefaultValue=0 +PDOMapping=0 + +[1018sub2] +ParameterName=Product Code +ObjectType=0x7 +DataType=0x0007 +AccessType=ro +DefaultValue=0 +PDOMapping=0 + +[1018sub3] +ParameterName=Revision Number +ObjectType=0x7 +DataType=0x0007 +AccessType=ro +DefaultValue=0 +PDOMapping=0 + +[1018sub4] +ParameterName=Serial Number +ObjectType=0x7 +DataType=0x0007 +AccessType=ro +DefaultValue=0 +PDOMapping=0 + +[OptionalObjects] +SupportedObjects=0 + +[ManufacturerObjects] +SupportedObjects=23 +1=0x2001 +2=0x2002 +3=0x2003 +4=0x2004 +5=0x2005 +6=0x2006 +7=0x2007 +8=0x2008 +9=0x2009 +10=0x200A +11=0x200B +12=0x200F +13=0x2010 +14=0x2011 +15=0x2012 +16=0x2013 +17=0x2014 +18=0x2015 +19=0x2016 +20=0x2018 +21=0x2019 +22=0x201A +23=0x201B + +[2001] +ParameterName=BOOLEAN +ObjectType=0x7 +DataType=0x0001 +AccessType=rw +DefaultValue=0 +PDOMapping=1 + +[2002] +ParameterName=INTEGER8 +ObjectType=0x7 +DataType=0x0002 +AccessType=rw +DefaultValue=12 +PDOMapping=1 + +[2003] +ParameterName=INTEGER16 +ObjectType=0x7 +DataType=0x0003 +AccessType=rw +DefaultValue=34 +PDOMapping=1 + +[2004] +ParameterName=INTEGER32 +ObjectType=0x7 +DataType=0x0004 +AccessType=rw +DefaultValue=45 +PDOMapping=1 + +[2005] +ParameterName=UNSIGNED8 +ObjectType=0x7 +DataType=0x0005 +AccessType=rw +DefaultValue=56 +PDOMapping=1 + +[2006] +ParameterName=UNSIGNED16 +ObjectType=0x7 +DataType=0x0006 +AccessType=rw +DefaultValue=8198 +PDOMapping=1 + +[2007] +ParameterName=UNSIGNED32 +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +DefaultValue=537337864 +PDOMapping=1 + +[2008] +ParameterName=REAL32 +ObjectType=0x7 +DataType=0x0008 +AccessType=rw +DefaultValue=1.2 +PDOMapping=1 + +[2009] +ParameterName=VISIBLE_STRING +ObjectType=0x7 +DataType=0x0009 +AccessType=rw +DefaultValue=ABCD +PDOMapping=1 + +[200A] +ParameterName=OCTET_STRING +ObjectType=0x7 +DataType=0x000A +AccessType=rw +DefaultValue=ABCD +PDOMapping=1 + +[200B] +ParameterName=UNICODE_STRING +ObjectType=0x7 +DataType=0x000B +AccessType=rw +DefaultValue=abc✓ +PDOMapping=1 + +[200F] +ParameterName=DOMAIN +ObjectType=0x7 +DataType=0x000F +AccessType=rw +DefaultValue=@ABCD +PDOMapping=1 + +[2010] +ParameterName=INTEGER24 +ObjectType=0x7 +DataType=0x0010 +AccessType=rw +DefaultValue=-1 +PDOMapping=1 + +[2011] +ParameterName=REAL64 +ObjectType=0x7 +DataType=0x0011 +AccessType=rw +DefaultValue=1.6 +PDOMapping=1 + +[2012] +ParameterName=INTEGER40 +ObjectType=0x7 +DataType=0x0012 +AccessType=rw +DefaultValue=-40 +PDOMapping=1 + +[2013] +ParameterName=INTEGER48 +ObjectType=0x7 +DataType=0x0013 +AccessType=rw +DefaultValue=-48 +PDOMapping=1 + +[2014] +ParameterName=INTEGER56 +ObjectType=0x7 +DataType=0x0014 +AccessType=rw +DefaultValue=-56 +PDOMapping=1 + +[2015] +ParameterName=INTEGER64 +ObjectType=0x7 +DataType=0x0015 +AccessType=rw +DefaultValue=-64 +PDOMapping=1 + +[2016] +ParameterName=UNSIGNED24 +ObjectType=0x7 +DataType=0x0016 +AccessType=rw +DefaultValue=24 +PDOMapping=1 + +[2018] +ParameterName=UNSIGNED40 +ObjectType=0x7 +DataType=0x0018 +AccessType=rw +DefaultValue=40 +PDOMapping=1 + +[2019] +ParameterName=UNSIGNED48 +ObjectType=0x7 +DataType=0x0019 +AccessType=rw +DefaultValue=48 +PDOMapping=1 + +[201A] +ParameterName=UNSIGNED56 +ObjectType=0x7 +DataType=0x001A +AccessType=rw +DefaultValue=56 +PDOMapping=1 + +[201B] +ParameterName=UNSIGNED64 +ObjectType=0x7 +DataType=0x001B +AccessType=rw +DefaultValue=64 +PDOMapping=1 diff --git a/test/test_od.py b/test/test_od.py index 9c25bfcb..d3755234 100644 --- a/test/test_od.py +++ b/test/test_od.py @@ -36,6 +36,30 @@ def test_unsigned32(self): self.assertEqual(var.decode_raw(b"\xfc\xfd\xfe\xff"), 4294901244) self.assertEqual(var.encode_raw(4294901244), b"\xfc\xfd\xfe\xff") + def test_unsigned40(self): + var = od.ODVariable("Test UNSIGNED40", 0x1000) + var.data_type = od.UNSIGNED40 + self.assertEqual(var.decode_raw(b"\xfb\xfc\xfd\xfe\xff"), 0xfffefdfcfb) + self.assertEqual(var.encode_raw(0xfffefdfcfb), b"\xfb\xfc\xfd\xfe\xff") + + def test_unsigned48(self): + var = od.ODVariable("Test UNSIGNED48", 0x1000) + var.data_type = od.UNSIGNED48 + self.assertEqual(var.decode_raw(b"\xfa\xfb\xfc\xfd\xfe\xff"), 0xfffefdfcfbfa) + self.assertEqual(var.encode_raw(0xfffefdfcfbfa), b"\xfa\xfb\xfc\xfd\xfe\xff") + + def test_unsigned56(self): + var = od.ODVariable("Test UNSIGNED56", 0x1000) + var.data_type = od.UNSIGNED56 + self.assertEqual(var.decode_raw(b"\xf9\xfa\xfb\xfc\xfd\xfe\xff"), 0xfffefdfcfbfaf9) + self.assertEqual(var.encode_raw(0xfffefdfcfbfaf9), b"\xf9\xfa\xfb\xfc\xfd\xfe\xff") + + def test_unsigned64(self): + var = od.ODVariable("Test UNSIGNED64", 0x1000) + var.data_type = od.UNSIGNED64 + self.assertEqual(var.decode_raw(b"\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff"), 0xfffefdfcfbfaf9f8) + self.assertEqual(var.encode_raw(0xfffefdfcfbfaf9f8), b"\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff") + def test_integer8(self): var = od.ODVariable("Test INTEGER8", 0x1000) var.data_type = od.INTEGER8 @@ -64,8 +88,58 @@ def test_integer32(self): var = od.ODVariable("Test INTEGER32", 0x1000) var.data_type = od.INTEGER32 self.assertEqual(var.decode_raw(b"\xfe\xff\xff\xff"), -2) + self.assertEqual(var.decode_raw(b"\x01\x00\x00\x00"), 1) + self.assertEqual(var.encode_raw(1), b"\x01\x00\x00\x00") self.assertEqual(var.encode_raw(-2), b"\xfe\xff\xff\xff") + def test_integer40(self): + var = od.ODVariable("Test INTEGER40", 0x1000) + var.data_type = od.INTEGER40 + self.assertEqual(var.decode_raw(b"\xfe\xff\xff\xff\xff"), -2) + self.assertEqual(var.decode_raw(b"\x01\x00\x00\x00\x00"), 1) + self.assertEqual(var.encode_raw(-2), b"\xfe\xff\xff\xff\xff") + self.assertEqual(var.encode_raw(1), b"\x01\x00\x00\x00\x00") + + def test_integer48(self): + var = od.ODVariable("Test INTEGER48", 0x1000) + var.data_type = od.INTEGER48 + self.assertEqual(var.decode_raw(b"\xfe\xff\xff\xff\xff\xff"), -2) + self.assertEqual(var.decode_raw(b"\x01\x00\x00\x00\x00\x00"), 1) + self.assertEqual(var.encode_raw(-2), b"\xfe\xff\xff\xff\xff\xff") + self.assertEqual(var.encode_raw(1), b"\x01\x00\x00\x00\x00\x00") + + def test_integer56(self): + var = od.ODVariable("Test INTEGER56", 0x1000) + var.data_type = od.INTEGER56 + self.assertEqual(var.decode_raw(b"\xfe\xff\xff\xff\xff\xff\xff"), -2) + self.assertEqual(var.decode_raw(b"\x01\x00\x00\x00\x00\x00\x00"), 1) + self.assertEqual(var.encode_raw(-2), b"\xfe\xff\xff\xff\xff\xff\xff") + self.assertEqual(var.encode_raw(1), b"\x01\x00\x00\x00\x00\x00\x00") + + def test_integer64(self): + var = od.ODVariable("Test INTEGER64", 0x1000) + var.data_type = od.INTEGER64 + self.assertEqual(var.decode_raw(b"\xfe\xff\xff\xff\xff\xff\xff\xff"), -2) + self.assertEqual(var.decode_raw(b"\x01\x00\x00\x00\x00\x00\x00\x00"), 1) + self.assertEqual(var.encode_raw(-2), b"\xfe\xff\xff\xff\xff\xff\xff\xff") + self.assertEqual(var.encode_raw(1), b"\x01\x00\x00\x00\x00\x00\x00\x00") + + def test_real32(self): + var = od.ODVariable("Test REAL32", 0x1000) + var.data_type = od.REAL32 + # Select values that are exaclty representable in decimal notation + self.assertEqual(var.decode_raw(b"\x00\x00\x00\x00"), 0.) + self.assertEqual(var.decode_raw(b"\x00\x00\x60\x40"), 3.5) + self.assertEqual(var.decode_raw(b"\x00\x20\x7a\xc4"), -1000.5) + + def test_real64(self): + var = od.ODVariable("Test REAL64", 0x1000) + var.data_type = od.REAL64 + # Select values that are exaclty representable in decimal notation + self.assertEqual(var.decode_raw(b"\x00\x00\x00\x00\x00\x00\x00\x00"), 0.) + self.assertEqual(var.decode_raw(b"\x00\x00\x00\x00\x00\x4a\x93\x40"), 1234.5) + self.assertEqual(var.decode_raw(b"\x06\x81\x95\x43\x0b\x42\x8f\xc0"), -1000.2555) + def test_visible_string(self): var = od.ODVariable("Test VISIBLE_STRING", 0x1000) var.data_type = od.VISIBLE_STRING @@ -73,6 +147,29 @@ def test_visible_string(self): self.assertEqual(var.decode_raw(b"zero terminated\x00"), "zero terminated") self.assertEqual(var.encode_raw("testing"), b"testing") + def test_unicode_string(self): + var = od.ODVariable("Test UNICODE_STRING", 0x1000) + var.data_type = od.UNICODE_STRING + self.assertEqual(var.decode_raw(b"\x61\x00\x62\x00\x63\x00"), "abc") + self.assertEqual(var.decode_raw(b"\x61\x00\x62\x00\x63\x00\x00\x00"), "abc") # Zero terminated + self.assertEqual(var.encode_raw("abc"), b"\x61\x00\x62\x00\x63\x00") + self.assertEqual(var.decode_raw(b"\x60\x3f\x7d\x59"), "\u3f60\u597d") # Chinese "Nǐ hǎo", hello + self.assertEqual(var.encode_raw("\u3f60\u597d"), b"\x60\x3f\x7d\x59") # Chinese "Nǐ hǎo", hello + + def test_octet_string(self): + var = od.ODVariable("Test OCTET_STRING", 0x1000) + var.data_type = od.OCTET_STRING + self.assertEqual(var.decode_raw(b"abcdefg"), b"abcdefg") + self.assertEqual(var.decode_raw(b"zero terminated\x00"), b"zero terminated\x00") + self.assertEqual(var.encode_raw(b"testing"), b"testing") + + def test_domain(self): + var = od.ODVariable("Test DOMAIN", 0x1000) + var.data_type = od.DOMAIN + self.assertEqual(var.decode_raw(b"abcdefg"), b"abcdefg") + self.assertEqual(var.decode_raw(b"zero terminated\x00"), b"zero terminated\x00") + self.assertEqual(var.encode_raw(b"testing"), b"testing") + class TestAlternativeRepresentations(unittest.TestCase): diff --git a/test/test_sdo.py b/test/test_sdo.py index 31759c22..1ec35dab 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -2,8 +2,11 @@ import unittest # import binascii import canopen +from canopen.objectdictionary import ODVariable +import canopen.objectdictionary.datatypes as dt EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') +DATAEDS_PATH = os.path.join(os.path.dirname(__file__), 'datatypes.eds') TX = 1 RX = 2 @@ -164,5 +167,327 @@ def test_add_sdo_channel(self): self.assertIn(client, self.network[2].sdo_channels) +class TestSDOClientDatatypes(unittest.TestCase): + """Test the SDO client uploads with the different data types in CANopen.""" + + def _send_message(self, can_id, data, remote=False): + """Will be used instead of the usual Network.send_message method. + + Checks that the message data is according to expected and answers + with the provided data. + """ + next_data = self.data.pop(0) + self.assertEqual(next_data[0], TX, "No transmission was expected") + # print("> %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1]))) + self.assertSequenceEqual(data, next_data[1]) + self.assertEqual(can_id, 0x602) + while self.data and self.data[0][0] == RX: + # print("< %s" % binascii.hexlify(self.data[0][1])) + self.network.notify(0x582, self.data.pop(0)[1], 0.0) + + def setUp(self): + network = canopen.Network() + network.send_message = self._send_message + node = network.add_node(2, DATAEDS_PATH) + node.sdo.RESPONSE_TIMEOUT = 0.01 + self.node = node + self.network = network + + def test_boolean(self): + self.data = [ + (TX, b'\x40\x01\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x01\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.BOOLEAN, 0) + self.assertEqual(data, b'\xfe') + + def test_unsigned8(self): + self.data = [ + (TX, b'\x40\x05\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x05\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED8, 0) + self.assertEqual(data, b'\xfe') + + def test_unsigned16(self): + self.data = [ + (TX, b'\x40\x06\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4b\x06\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED16, 0) + self.assertEqual(data, b'\xfe\xfd') + + def test_unsigned24(self): + self.data = [ + (TX, b'\x40\x16\x20\x00\x00\x00\x00\x00'), + (RX, b'\x47\x16\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED24, 0) + self.assertEqual(data, b'\xfe\xfd\xfc') + + def test_unsigned32(self): + self.data = [ + (TX, b'\x40\x07\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x07\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED32, 0) + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') + + def test_unsigned40(self): + self.data = [ + (TX, b'\x40\x18\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x18\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x05\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED40, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91') + + def test_unsigned48(self): + self.data = [ + (TX, b'\x40\x19\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x19\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x03\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED48, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12') + + def test_unsigned56(self): + self.data = [ + (TX, b'\x40\x1a\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x1a\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED56, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03') + + def test_unsigned64(self): + self.data = [ + (TX, b'\x40\x1b\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x1b\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED64, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') + + def test_integer8(self): + self.data = [ + (TX, b'\x40\x02\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x02\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER8, 0) + self.assertEqual(data, b'\xfe') + + def test_integer16(self): + self.data = [ + (TX, b'\x40\x03\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4b\x03\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER16, 0) + self.assertEqual(data, b'\xfe\xfd') + + def test_integer24(self): + self.data = [ + (TX, b'\x40\x10\x20\x00\x00\x00\x00\x00'), + (RX, b'\x47\x10\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER24, 0) + self.assertEqual(data, b'\xfe\xfd\xfc') + + def test_integer32(self): + self.data = [ + (TX, b'\x40\x04\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x04\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER32, 0) + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') + + def test_integer40(self): + self.data = [ + (TX, b'\x40\x12\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x12\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x05\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER40, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91') + + def test_integer48(self): + self.data = [ + (TX, b'\x40\x13\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x13\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x03\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER48, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12') + + def test_integer56(self): + self.data = [ + (TX, b'\x40\x14\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x14\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\xb2\x01\x20\x02\x91\x12\x03'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER56, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03') + + def test_integer64(self): + self.data = [ + (TX, b'\x40\x15\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x15\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.INTEGER64, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') + + def test_real32(self): + self.data = [ + (TX, b'\x40\x08\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x08\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2000 + dt.REAL32, 0) + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') + + def test_real64(self): + self.data = [ + (TX, b'\x40\x11\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x11\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), + ] + data = self.network[2].sdo.upload(0x2000 + dt.REAL64, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') + + def test_visible_string(self): + self.data = [ + (TX, b'\x40\x09\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x09\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') + ] + data = self.network[2].sdo.upload(0x2000 + dt.VISIBLE_STRING, 0) + self.assertEqual(data, b'Tiny Node - Mega Domains !') + + def test_unicode_string(self): + self.data = [ + (TX, b'\x40\x0b\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0b\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') + ] + data = self.network[2].sdo.upload(0x2000 + dt.UNICODE_STRING, 0) + self.assertEqual(data, b'Tiny Node - Mega Domains !') + + def test_octet_string(self): + self.data = [ + (TX, b'\x40\x0a\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0a\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') + ] + data = self.network[2].sdo.upload(0x2000 + dt.OCTET_STRING, 0) + self.assertEqual(data, b'Tiny Node - Mega Domains !') + + def test_domain(self): + self.data = [ + (TX, b'\x40\x0f\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0f\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') + ] + data = self.network[2].sdo.upload(0x2000 + dt.DOMAIN, 0) + self.assertEqual(data, b'Tiny Node - Mega Domains !') + + def test_unknown_od_32(self): + """Test an unknown OD entry of 32 bits (4 bytes).""" + self.data = [ + (TX, b'\x40\xFF\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\xFF\x20\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x20FF, 0) + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') + + def test_unknown_od_112(self): + """Test an unknown OD entry of 112 bits (14 bytes).""" + self.data = [ + (TX, b'\x40\xFF\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\xFF\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x11\x19\x21\x70\xfe\xfd\xfc\xfb'), + ] + data = self.network[2].sdo.upload(0x20FF, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb') + + def test_unknown_datatype32(self): + """Test an unknown datatype, but known OD, of 32 bits (4 bytes).""" + return # FIXME: Disabled temporarily until datatype conditionals are fixed, see #436 + # Add fake entry 0x2100 to OD, using fake datatype 0xFF + if 0x2100 not in self.node.object_dictionary: + fake_var = ODVariable("Fake", 0x2100) + fake_var.data_type = 0xFF + self.node.object_dictionary.add_object(fake_var) + self.data = [ + (TX, b'\x40\x00\x21\x00\x00\x00\x00\x00'), + (RX, b'\x43\x00\x21\x00\xfe\xfd\xfc\xfb') + ] + data = self.network[2].sdo.upload(0x2100, 0) + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') + + def test_unknown_datatype112(self): + """Test an unknown datatype, but known OD, of 112 bits (14 bytes).""" + return # FIXME: Disabled temporarily until datatype conditionals are fixed, see #436 + # Add fake entry 0x2100 to OD, using fake datatype 0xFF + if 0x2100 not in self.node.object_dictionary: + fake_var = ODVariable("Fake", 0x2100) + fake_var.data_type = 0xFF + self.node.object_dictionary.add_object(fake_var) + self.data = [ + (TX, b'\x40\x00\x21\x00\x00\x00\x00\x00'), + (RX, b'\x41\x00\x21\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x11\x19\x21\x70\xfe\xfd\xfc\xfb'), + ] + data = self.network[2].sdo.upload(0x2100, 0) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb') + if __name__ == "__main__": unittest.main()