diff --git a/canopen_fake_slaves/include/canopen_fake_slaves/basic_slave.hpp b/canopen_fake_slaves/include/canopen_fake_slaves/basic_slave.hpp index 62a6dc97..ab90bc0b 100644 --- a/canopen_fake_slaves/include/canopen_fake_slaves/basic_slave.hpp +++ b/canopen_fake_slaves/include/canopen_fake_slaves/basic_slave.hpp @@ -24,6 +24,7 @@ #include #include +#include #include "canopen_fake_slaves/base_slave.hpp" #include "lifecycle_msgs/msg/state.hpp" @@ -38,6 +39,59 @@ class SimpleSlave : public canopen::BasicSlave using BasicSlave::BasicSlave; protected: + /** + * @brief This function gets an object value through the typed interface. + * Only supports object types that can fit in a 32-bit container. + * @param idx The index of the PDO. + * @param subidx The subindex of the PDO. + * @return value of object stored in a 32-bit container + */ + uint32_t GetValue(const uint16_t idx, const uint8_t subidx) const noexcept + { + auto & type = (*this)[idx][subidx].Type(); + + uint32_t value{0}; + + if (type == typeid(bool)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(int8_t)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(int16_t)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(int32_t)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(float)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(uint8_t)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(uint16_t)) + { + value = static_cast((*this)[idx][subidx].Get()); + } + else if (type == typeid(uint32_t)) + { + value = (*this)[idx][subidx].Get(); + } + else + { + value = (*this)[idx][subidx].Get(); + } + + return value; + } + /** * @brief This function is called when a value is written to the local object dictionary by an SDO * or RPDO. Also copies the RPDO value to TPDO. @@ -46,8 +100,7 @@ class SimpleSlave : public canopen::BasicSlave */ void OnWrite(uint16_t idx, uint8_t subidx) noexcept override { - uint32_t val = (*this)[idx][subidx]; - (*this)[0x4001][0] = val; + (*this)[0x4001][0] = this->GetValue(idx, subidx); this->TpdoEvent(0); } }; diff --git a/canopen_tests/config/simple/simple.eds b/canopen_tests/config/simple/simple.eds index 68d9bb2a..c4ea3fa9 100644 --- a/canopen_tests/config/simple/simple.eds +++ b/canopen_tests/config/simple/simple.eds @@ -63,11 +63,12 @@ SupportedObjects=11 11=0x1A00 [ManufacturerObjects] -SupportedObjects=4 +SupportedObjects=5 1=0x4000 2=0x4001 3=0x4002 4=0x4003 +5=0x4004 [1000] ParameterName=Device type @@ -280,3 +281,56 @@ AccessType=rw ParameterName=INTEGER16 test DataType=0x0003 AccessType=rw + +[4004] +ParameterName=Test Data +ObjectType=0x9 +SubNumber=0x8 + +[4004sub0] +ParameterName=Highest sub-index supported +ObjectType=0x7 +DataType=0x0005 +AccessType=ro + +[4004sub1] +ParameterName=BOOL test data +ObjectType=0x7 +DataType=0x0001 +AccessType=rw + +[4004sub2] +ParameterName=SIGNED8 test data +ObjectType=0x7 +DataType=0x0002 +AccessType=rw + +[4004sub3] +ParameterName=SIGNED16 test data +ObjectType=0x7 +DataType=0x0003 +AccessType=rw + +[4004sub4] +ParameterName=SIGNED32 test data +ObjectType=0x7 +DataType=0x0004 +AccessType=rw + +[4004sub5] +ParameterName=UNSIGNED8 test data +ObjectType=0x7 +DataType=0x0005 +AccessType=rw + +[4004sub6] +ParameterName=UNSIGNED16 test data +ObjectType=0x7 +DataType=0x0006 +AccessType=rw + +[4004sub7] +ParameterName=UNSIGNED32 test data +ObjectType=0x7 +DataType=0x0007 +AccessType=rw diff --git a/canopen_tests/launch_tests/test_proxy_driver.py b/canopen_tests/launch_tests/test_proxy_driver.py index c90f7ed0..d0be7ff2 100644 --- a/canopen_tests/launch_tests/test_proxy_driver.py +++ b/canopen_tests/launch_tests/test_proxy_driver.py @@ -144,6 +144,204 @@ def test_sdo_write(self): self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) time.sleep(0.01) + def test_sdo_write_signed8(self): + """Tests SDO writing to a SIGNED8 data object.""" + index = 0x4004 + subindex = 2 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + + def test_sdo_write_signed16(self): + """Tests SDO writing to a SIGNED16 data object.""" + index = 0x4004 + subindex = 3 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + + def test_sdo_write_signed32(self): + """Tests SDO writing to a SIGNED32 data object.""" + index = 0x4004 + subindex = 4 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + + def test_sdo_write_unsigned8(self): + """Tests SDO writing to an UNSIGNED8 data object.""" + index = 0x4004 + subindex = 5 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + + def test_sdo_write_unsigned16(self): + """Tests SDO writing to an UNSIGNED16 data object.""" + index = 0x4004 + subindex = 6 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + + def test_sdo_write_unsigned32(self): + """Tests SDO writing to an UNSIGNED32 data object.""" + index = 0x4004 + subindex = 7 + data = 100 + + req = COWrite.Request() + req.index = index + req.subindex = subindex + req.data = data + + res = COWrite.Response() + res.success = True + + rreq = CORead.Request() + rreq.index = index + rreq.subindex = subindex + + rres = CORead.Response() + rres.success = True + rres.data = data + + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres) + self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres) + req.data = 0 + time.sleep(0.01) + self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res) + self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res) + time.sleep(0.01) + class TestSDOMaster(unittest.TestCase): def run_node(self):