Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back PdoBase.export() dependency (canmatrix) #493

Merged
merged 10 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.x']
features: ['', '[db_export]']

steps:
- uses: actions/checkout@v3
Expand All @@ -28,7 +29,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install pytest pytest-cov
pip install -e .
pip install -e '.${{ matrix.features }}'
- name: Test with pytest
run: |
pytest -v --cov=canopen --cov-report=xml --cov-branch
Expand Down
23 changes: 23 additions & 0 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import functools
import threading
import math
from typing import Callable, Dict, Iterator, List, Optional, Union, TYPE_CHECKING
Expand All @@ -9,13 +10,28 @@
from canopen.sdo import SdoAbortedError
from canopen import objectdictionary
from canopen import variable
try:
import canmatrix
except ImportError:
canmatrix = None

if TYPE_CHECKING:
from canopen.network import Network
from canopen import LocalNode, RemoteNode
from canopen.pdo import RPDO, TPDO
from canopen.sdo import SdoRecord


def _disable_if(condition):
"""Conditionally disable given function/method."""
def deco(func):
@functools.wraps(func)
def wrapper(*args, **kwds):
if not condition:
return func(*args, **kwds)
return wrapper
return deco
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved

PDO_NOT_VALID = 1 << 31
RTR_NOT_ALLOWED = 1 << 30

Expand Down Expand Up @@ -75,9 +91,16 @@ def subscribe(self):
for pdo_map in self.map.values():
pdo_map.subscribe()

@_disable_if(canmatrix is None)
def export(self, filename):
"""Export current configuration to a database file.

.. note::
This API depends on the :mod:`!canmatrix` optional dependency,
which can be installed by requesting the ``db_export`` feature::

python3 -m pip install 'canopen[db_export]'

:param str filename:
Filename to save to (e.g. DBC, DBF, ARXML, KCD etc)

Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ dependencies = [
]
dynamic = ["version"]

[project.optional-dependencies]
db_export = [
"canmatrix ~= 1.0",
]

[project.urls]
documentation = "https://canopen.readthedocs.io/en/stable/"
repository = "https://github.com/christiansandberg/canopen"
Expand Down
79 changes: 51 additions & 28 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,51 @@
import os.path
import unittest

import canopen
try:
import canmatrix
except ImportError:
canmatrix = None
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')


class TestPDO(unittest.TestCase):

def test_bit_mapping(self):
def setUp(self):
node = canopen.Node(1, EDS_PATH)
map = node.pdo.tx[1]
map.add_variable('INTEGER16 value') # 0x2001
map.add_variable('UNSIGNED8 value', length=4) # 0x2002
map.add_variable('INTEGER8 value', length=4) # 0x2003
map.add_variable('INTEGER32 value') # 0x2004
map.add_variable('BOOLEAN value', length=1) # 0x2005
map.add_variable('BOOLEAN value 2', length=1) # 0x2006
pdo = node.pdo.tx[1]
pdo.add_variable('INTEGER16 value') # 0x2001
pdo.add_variable('UNSIGNED8 value', length=4) # 0x2002
pdo.add_variable('INTEGER8 value', length=4) # 0x2003
pdo.add_variable('INTEGER32 value') # 0x2004
pdo.add_variable('BOOLEAN value', length=1) # 0x2005
pdo.add_variable('BOOLEAN value 2', length=1) # 0x2006

# Write some values
map['INTEGER16 value'].raw = -3
map['UNSIGNED8 value'].raw = 0xf
map['INTEGER8 value'].raw = -2
map['INTEGER32 value'].raw = 0x01020304
map['BOOLEAN value'].raw = False
map['BOOLEAN value 2'].raw = True
pdo['INTEGER16 value'].raw = -3
pdo['UNSIGNED8 value'].raw = 0xf
pdo['INTEGER8 value'].raw = -2
pdo['INTEGER32 value'].raw = 0x01020304
pdo['BOOLEAN value'].raw = False
pdo['BOOLEAN value 2'].raw = True

self.pdo = pdo
self.node = node

# Check expected data
self.assertEqual(map.data, b'\xfd\xff\xef\x04\x03\x02\x01\x02')
def test_pdo_map_bit_mapping(self):
self.assertEqual(self.pdo.data, b'\xfd\xff\xef\x04\x03\x02\x01\x02')

# Read values from data
self.assertEqual(map['INTEGER16 value'].raw, -3)
self.assertEqual(map['UNSIGNED8 value'].raw, 0xf)
self.assertEqual(map['INTEGER8 value'].raw, -2)
self.assertEqual(map['INTEGER32 value'].raw, 0x01020304)
self.assertEqual(map['BOOLEAN value'].raw, False)
self.assertEqual(map['BOOLEAN value 2'].raw, True)
def test_pdo_map_getitem(self):
pdo = self.pdo
self.assertEqual(pdo['INTEGER16 value'].raw, -3)
self.assertEqual(pdo['UNSIGNED8 value'].raw, 0xf)
self.assertEqual(pdo['INTEGER8 value'].raw, -2)
self.assertEqual(pdo['INTEGER32 value'].raw, 0x01020304)
self.assertEqual(pdo['BOOLEAN value'].raw, False)
self.assertEqual(pdo['BOOLEAN value 2'].raw, True)

def test_pdo_getitem(self):
node = self.node
self.assertEqual(node.tpdo[1]['INTEGER16 value'].raw, -3)
self.assertEqual(node.tpdo[1]['UNSIGNED8 value'].raw, 0xf)
self.assertEqual(node.tpdo[1]['INTEGER8 value'].raw, -2)
Expand All @@ -55,10 +65,23 @@ def test_bit_mapping(self):
self.assertEqual(node.tpdo[0x2002].raw, 0xf)
self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf)

def test_save_pdo(self):
node = canopen.Node(1, EDS_PATH)
node.tpdo.save()
node.rpdo.save()
def test_pdo_save(self):
self.node.tpdo.save()
self.node.rpdo.save()

@unittest.skipIf(canmatrix is None, "Requires canmatrix dependency")
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
def test_pdo_export(self):
import tempfile

for pdo in "tpdo", "rpdo":
with tempfile.NamedTemporaryFile(suffix=".csv") as tmp:
fn = tmp.name
with self.subTest(pdo=pdo, filename=fn):
erlend-aasland marked this conversation as resolved.
Show resolved Hide resolved
getattr(self.node, pdo).export(fn)
with open(fn) as csv:
header = csv.readline()
self.assertIn("ID", header)
self.assertIn("Frame Name", header)


if __name__ == "__main__":
Expand Down
Loading