Skip to content

Commit

Permalink
Merge branch 'kurtist123-feature/multiple-sdo-write-types'
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Hellmann Santos committed Apr 18, 2024
2 parents 53995d9 + e2522d3 commit bdb495f
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 3 deletions.
57 changes: 55 additions & 2 deletions canopen_fake_slaves/include/canopen_fake_slaves/basic_slave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <lely/io2/sys/timer.hpp>

#include <thread>
#include <typeinfo>

#include "canopen_fake_slaves/base_slave.hpp"
#include "lifecycle_msgs/msg/state.hpp"
Expand All @@ -38,6 +39,59 @@ class SimpleSlave : public canopen::BasicSlave
using BasicSlave::BasicSlave;

protected:
/**
* @brief This function gets an object value through the typed interface.
* Only supports object types that can fit in a 32-bit container.
* @param idx The index of the PDO.
* @param subidx The subindex of the PDO.
* @return value of object stored in a 32-bit container
*/
uint32_t GetValue(const uint16_t idx, const uint8_t subidx) const noexcept
{
auto & type = (*this)[idx][subidx].Type();

uint32_t value{0};

if (type == typeid(bool))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<bool>());
}
else if (type == typeid(int8_t))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<int8_t>());
}
else if (type == typeid(int16_t))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<int16_t>());
}
else if (type == typeid(int32_t))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<int32_t>());
}
else if (type == typeid(float))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<float>());
}
else if (type == typeid(uint8_t))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<uint8_t>());
}
else if (type == typeid(uint16_t))
{
value = static_cast<uint32_t>((*this)[idx][subidx].Get<uint16_t>());
}
else if (type == typeid(uint32_t))
{
value = (*this)[idx][subidx].Get<uint32_t>();
}
else
{
value = (*this)[idx][subidx].Get<uint32_t>();
}

return value;
}

/**
* @brief This function is called when a value is written to the local object dictionary by an SDO
* or RPDO. Also copies the RPDO value to TPDO.
Expand All @@ -46,8 +100,7 @@ class SimpleSlave : public canopen::BasicSlave
*/
void OnWrite(uint16_t idx, uint8_t subidx) noexcept override
{
uint32_t val = (*this)[idx][subidx];
(*this)[0x4001][0] = val;
(*this)[0x4001][0] = this->GetValue(idx, subidx);
this->TpdoEvent(0);
}
};
Expand Down
56 changes: 55 additions & 1 deletion canopen_tests/config/simple/simple.eds
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ SupportedObjects=11
11=0x1A00

[ManufacturerObjects]
SupportedObjects=4
SupportedObjects=5
1=0x4000
2=0x4001
3=0x4002
4=0x4003
5=0x4004

[1000]
ParameterName=Device type
Expand Down Expand Up @@ -280,3 +281,56 @@ AccessType=rw
ParameterName=INTEGER16 test
DataType=0x0003
AccessType=rw

[4004]
ParameterName=Test Data
ObjectType=0x9
SubNumber=0x8

[4004sub0]
ParameterName=Highest sub-index supported
ObjectType=0x7
DataType=0x0005
AccessType=ro

[4004sub1]
ParameterName=BOOL test data
ObjectType=0x7
DataType=0x0001
AccessType=rw

[4004sub2]
ParameterName=SIGNED8 test data
ObjectType=0x7
DataType=0x0002
AccessType=rw

[4004sub3]
ParameterName=SIGNED16 test data
ObjectType=0x7
DataType=0x0003
AccessType=rw

[4004sub4]
ParameterName=SIGNED32 test data
ObjectType=0x7
DataType=0x0004
AccessType=rw

[4004sub5]
ParameterName=UNSIGNED8 test data
ObjectType=0x7
DataType=0x0005
AccessType=rw

[4004sub6]
ParameterName=UNSIGNED16 test data
ObjectType=0x7
DataType=0x0006
AccessType=rw

[4004sub7]
ParameterName=UNSIGNED32 test data
ObjectType=0x7
DataType=0x0007
AccessType=rw
198 changes: 198 additions & 0 deletions canopen_tests/launch_tests/test_proxy_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,204 @@ def test_sdo_write(self):
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_signed8(self):
"""Tests SDO writing to a SIGNED8 data object."""
index = 0x4004
subindex = 2
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_signed16(self):
"""Tests SDO writing to a SIGNED16 data object."""
index = 0x4004
subindex = 3
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_signed32(self):
"""Tests SDO writing to a SIGNED32 data object."""
index = 0x4004
subindex = 4
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_unsigned8(self):
"""Tests SDO writing to an UNSIGNED8 data object."""
index = 0x4004
subindex = 5
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_unsigned16(self):
"""Tests SDO writing to an UNSIGNED16 data object."""
index = 0x4004
subindex = 6
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)

def test_sdo_write_unsigned32(self):
"""Tests SDO writing to an UNSIGNED32 data object."""
index = 0x4004
subindex = 7
data = 100

req = COWrite.Request()
req.index = index
req.subindex = subindex
req.data = data

res = COWrite.Response()
res.success = True

rreq = CORead.Request()
rreq.index = index
rreq.subindex = subindex

rres = CORead.Response()
rres.success = True
rres.data = data

self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_read", CORead, rreq, rres)
self.node.call_service("proxy_device_2/sdo_read", CORead, rreq, rres)
req.data = 0
time.sleep(0.01)
self.node.call_service("proxy_device_1/sdo_write", COWrite, req, res)
self.node.call_service("proxy_device_2/sdo_write", COWrite, req, res)
time.sleep(0.01)


class TestSDOMaster(unittest.TestCase):
def run_node(self):
Expand Down

0 comments on commit bdb495f

Please sign in to comment.