Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdo: Skip saving if no COB-ID was set (fixes #445) #446

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ def __getitem_by_name(self, value):
value, ', '.join(valid_values)))

def __getitem__(self, key: Union[int, str]) -> "PdoVariable":
var = None
if isinstance(key, int):
# there is a maximum available of 8 slots per PDO map
if key in range(0, 8):
Expand Down Expand Up @@ -360,7 +359,7 @@ def _raw_from(param):
subindex = (value >> 8) & 0xFF
# Ignore the highest bit, it is never valid for <= 64 PDO length
size = value & 0x7F
if hasattr(self.pdo_node.node, "curtis_hack") and self.pdo_node.node.curtis_hack: # Curtis HACK: mixed up field order
if getattr(self.pdo_node.node, "curtis_hack", False): # Curtis HACK: mixed up field order
friederschueler marked this conversation as resolved.
Show resolved Hide resolved
index = value & 0xFFFF
subindex = (value >> 16) & 0xFF
size = (value >> 24) & 0x7F
Expand All @@ -371,8 +370,10 @@ def _raw_from(param):

def save(self) -> None:
"""Save PDO configuration for this map using SDO."""
logger.info("Setting COB-ID 0x%X and temporarily disabling PDO",
self.cob_id)
if self.cob_id is None:
logger.debug("Skipping %s: no cob-id set", self.com_record.od.name)
friederschueler marked this conversation as resolved.
Show resolved Hide resolved
return
logger.info("Setting COB-ID 0x%X and temporarily disabling PDO", self.cob_id)
self.com_record[1].raw = self.cob_id | PDO_NOT_VALID | (RTR_NOT_ALLOWED if not self.rtr_allowed else 0x0)
if self.trans_type is not None:
logger.info("Setting transmission type to %d", self.trans_type)
Expand All @@ -387,39 +388,38 @@ def save(self) -> None:
logger.info("Setting SYNC start value to %d", self.sync_start_value)
self.com_record[6].raw = self.sync_start_value

if self.map is not None:
try:
self.map_array[0].raw = 0
except SdoAbortedError:
# WORKAROUND for broken implementations: If the array has a
# fixed number of entries (count not writable), generate dummy
# mappings for an invalid object 0x0000:00 to overwrite any
# excess entries with all-zeros.
self._fill_map(self.map_array[0].raw)
subindex = 1
for var in self.map:
logger.info("Writing %s (0x%X:%d, %d bits) to PDO map",
var.name, var.index, var.subindex, var.length)
if hasattr(self.pdo_node.node, "curtis_hack") and self.pdo_node.node.curtis_hack: # Curtis HACK: mixed up field order
self.map_array[subindex].raw = (var.index |
var.subindex << 16 |
var.length << 24)
else:
self.map_array[subindex].raw = (var.index << 16 |
var.subindex << 8 |
var.length)
subindex += 1
try:
self.map_array[0].raw = len(self.map)
except SdoAbortedError as e:
# WORKAROUND for broken implementations: If the array
# number-of-entries parameter is not writable, we have already
# generated the required number of mappings above.
if e.code != 0x06010002:
# Abort codes other than "Attempt to write a read-only
# object" should still be reported.
raise
self._update_data_size()
try:
self.map_array[0].raw = 0
except SdoAbortedError:
# WORKAROUND for broken implementations: If the array has a
# fixed number of entries (count not writable), generate dummy
# mappings for an invalid object 0x0000:00 to overwrite any
# excess entries with all-zeros.
self._fill_map(self.map_array[0].raw)
subindex = 1
for var in self.map:
logger.info("Writing %s (0x%X:%d, %d bits) to PDO map",
var.name, var.index, var.subindex, var.length)
if getattr(self.pdo_node.node, "curtis_hack", False): # Curtis HACK: mixed up field order
friederschueler marked this conversation as resolved.
Show resolved Hide resolved
self.map_array[subindex].raw = (var.index |
var.subindex << 16 |
var.length << 24)
else:
self.map_array[subindex].raw = (var.index << 16 |
var.subindex << 8 |
var.length)
subindex += 1
try:
self.map_array[0].raw = len(self.map)
except SdoAbortedError as e:
# WORKAROUND for broken implementations: If the array
# number-of-entries parameter is not writable, we have already
# generated the required number of mappings above.
if e.code != 0x06010002:
# Abort codes other than "Attempt to write a read-only
# object" should still be reported.
raise
self._update_data_size()

if self.enabled:
self.com_record[1].raw = self.cob_id | (RTR_NOT_ALLOWED if not self.rtr_allowed else 0x0)
Expand Down Expand Up @@ -521,7 +521,7 @@ def remote_request(self) -> None:
Silently ignore if not allowed.
"""
if self.enabled and self.rtr_allowed:
self.pdo_node.network.send_message(self.cob_id, None, remote=True)
self.pdo_node.network.send_message(self.cob_id, bytes(), remote=True)
friederschueler marked this conversation as resolved.
Show resolved Hide resolved

def wait_for_reception(self, timeout: float = 10) -> float:
"""Wait for the next transmit PDO.
Expand Down Expand Up @@ -600,7 +600,7 @@ def set_data(self, data: bytes):
cur_msg_data = cur_msg_data & bitwise_not
# Set the new data on the correct position
data = (data << bit_offset) | cur_msg_data
data = od_struct.pack_into(self.pdo_parent.data, byte_offset, data)
od_struct.pack_into(self.pdo_parent.data, byte_offset, data)
else:
self.pdo_parent.data[byte_offset:byte_offset + len(data)] = data

Expand Down
5 changes: 5 additions & 0 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def test_bit_mapping(self):
self.assertEqual(node.tpdo[0x2002].raw, 0xf)
self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf)

def test_save_pdo(self):
node = canopen.Node(1, EDS_PATH)
node.tpdo.save()
node.rpdo.save()


if __name__ == "__main__":
unittest.main()
Loading