Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable message_filters support of python interfaces and tests #7

Merged
merged 13 commits into from
Sep 26, 2018
Merged
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcutils)
find_package(builtin_interfaces REQUIRED)
find_package(std_msgs REQUIRED)
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
find_package(sensor_msgs REQUIRED)

include_directories(include)
add_library(${PROJECT_NAME} SHARED src/connection.cpp)
ament_target_dependencies(${PROJECT_NAME}
"rclcpp"
"rcutils"
"std_msgs"
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
"builtin_interfaces"
"sensor_msgs")

Expand Down Expand Up @@ -108,6 +110,11 @@ if(BUILD_TESTING)
target_link_libraries(${PROJECT_NAME}-test_fuzz ${PROJECT_NAME})
endif()

# python tests with python interfaces of message filters
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(directed.py "test/directed.py")
ament_add_pytest_test(test_approxsync.py "test/test_approxsync.py")
ament_add_pytest_test(test_message_filters_cache.py "test/test_message_filters_cache.py")
endif()

ament_package()
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<build_depend>builtin_interfaces</build_depend>
<build_depend>rclcpp</build_depend>
<build_depend>rclpy</build_depend>
<build_depend>std_msgs</build_depend>
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
<build_depend>sensor_msgs</build_depend>

<test_depend>ament_lint_auto</test_depend>
Expand Down
70 changes: 43 additions & 27 deletions src/message_filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@
======================
"""

from functools import reduce
import itertools
import threading
import rospy
import rclpy

from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.logging import LoggingSeverity
from rclpy.time import Time


class SimpleFilter(object):
Expand All @@ -43,9 +49,8 @@ def __init__(self):
def registerCallback(self, cb, *args):
"""
Register a callback function `cb` to be called when this filter
has output.
The filter calls the function ``cb`` with a filter-dependent list of arguments,
followed by the call-supplied arguments ``args``.
has output. The filter calls the function ``cb`` with a filter-dependent
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
list of arguments,followed by the call-supplied arguments ``args``.
"""

conn = len(self.callbacks)
Expand All @@ -59,17 +64,18 @@ def signalMessage(self, *msg):
class Subscriber(SimpleFilter):

"""
ROS subscription filter. Identical arguments as :class:`rospy.Subscriber`.
ROS2 subscription filter,Identical arguments as :class:`rclpy.Subscriber`.

This class acts as a highest-level filter, simply passing messages
from a ROS subscription through to the filters which have connected
from a ROS2 subscription through to the filters which have connected
to it.
"""
def __init__(self, *args, **kwargs):
SimpleFilter.__init__(self)
self.topic = args[0]
self.node = args[0]
self.topic = args[2]
kwargs['callback'] = self.callback
self.sub = rospy.Subscriber(*args, **kwargs)
self.sub = self.node.create_subscription(*args[1:], **kwargs)

def callback(self, msg):
self.signalMessage(msg)
Expand Down Expand Up @@ -113,11 +119,16 @@ def connectInput(self, f):
def add(self, msg):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
if not self.allow_headerless:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
rclpy.logging._root_logger.log("can not use message filters"
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
"for messages without timestamp infomation when"
"'allow_headerless' is disabled. auto assign"
"ROSTIME to headerless messages once enabling"
"constructor option of 'allow_headerless'.",
LoggingSeverity.INFO)

return
stamp = rospy.Time.now()

stamp = ROSClock().now()
else:
stamp = msg.header.stamp

Expand Down Expand Up @@ -180,11 +191,11 @@ class TimeSynchronizer(SimpleFilter):
Synchronizes messages by their timestamps.

:class:`TimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. TimeSynchronizer
listens on multiple input message filters ``fs``, and invokes the callback
when it has a collection of messages with matching timestamps.
timestamps contained in their messages' headers. TimeSynchronizer listens
on multiple input message filters ``fs``, and invokes the callback when
it has a collection of messages with matching timestamps.

The signature of the callback function is::
The signature of the callback function is:

def callback(msg1, ... msgN):

Expand Down Expand Up @@ -227,27 +238,32 @@ class ApproximateTimeSynchronizer(TimeSynchronizer):
"""
Approximately synchronizes messages by their timestamps.

:class:`ApproximateTimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. The API is the same as TimeSynchronizer
except for an extra `slop` parameter in the constructor that defines the delay (in seconds)
with which messages can be synchronized. The ``allow_headerless`` option specifies whether
to allow storing headerless messages with current ROS time instead of timestamp. You should
:class:`ApproximateTimeSynchronizer` synchronizes incoming message filters
by the timestamps contained in their messages' headers. The API is the same
as TimeSynchronizer except for an extra `slop` parameter in the constructor
that defines the delay (in seconds) with which messages can be synchronized.
The ``allow_headerless`` option specifies whether to allow storing
headerless messages with current ROS time instead of timestamp. You should
avoid this as much as you can, since the delays are unpredictable.
"""

def __init__(self, fs, queue_size, slop, allow_headerless=False):
TimeSynchronizer.__init__(self, fs, queue_size)
self.slop = rospy.Duration.from_sec(slop)
self.slop = Duration(seconds=slop).nanoseconds
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
self.allow_headerless = allow_headerless

def add(self, msg, my_queue, my_queue_index=None):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
if not self.allow_headerless:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
rclpy.logging._root_logger.log("can not use message filters"
"for messages without timestamp infomation when"
"'allow_headerless' is disabled. auto assign"
"ROSTIME to headerless messages once enabling"
"constructor option of 'allow_headerless'.",
LoggingSeverity.INFO)
return
stamp = rospy.Time.now()

stamp = ROSClock().now().nanoseconds
gaoethan marked this conversation as resolved.
Show resolved Hide resolved
else:
stamp = msg.header.stamp

Expand Down Expand Up @@ -275,7 +291,7 @@ def add(self, msg, my_queue, my_queue_index=None):
return
topic_stamps = sorted(topic_stamps, key=lambda x: x[1])
stamps.append(topic_stamps)
for vv in itertools.product(*[zip(*s)[0] for s in stamps]):
for vv in itertools.product(*[list(zip(*s))[0] for s in stamps]):
tfoote marked this conversation as resolved.
Show resolved Hide resolved
vv = list(vv)
# insert the new message
if my_queue_index is not None:
Expand Down
15 changes: 6 additions & 9 deletions test/directed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
#
# wide.registerCallback(boost::bind(&PersonDataRecorder::wideCB, this, _1, _2, _3, _4));

import rostest
import rospy
import rclpy
import random
import unittest

Expand Down Expand Up @@ -55,7 +54,8 @@ def test_synchronizer(self):
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])

# Scramble sequences of length N. Make sure that TimeSequencer recombines them.
# Scramble sequences of length N.
# Make sure that TimeSequencer recombines them.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
Expand All @@ -74,9 +74,6 @@ def test_synchronizer(self):
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

if __name__ == '__main__':
if 0:
rostest.unitrun('message_filters', 'directed', TestDirected)
else:
suite = unittest.TestSuite()
suite.addTest(TestDirected('test_synchronizer'))
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestSuite()
suite.addTest(TestDirected('test_synchronizer'))
unittest.TextTestRunner(verbosity=2).run(suite)
50 changes: 27 additions & 23 deletions test/test_approxsync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
# Software License Agreement (BSD License)
#
Expand Down Expand Up @@ -32,14 +32,16 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import rostest
import rospy
import unittest
import random

import message_filters
from message_filters import ApproximateTimeSynchronizer

import random
import rclpy
from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.time import Time
import unittest

class MockHeader:
pass

Expand Down Expand Up @@ -78,13 +80,16 @@ def test_approx(self):
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])

# Scramble sequences of length N. Make sure that TimeSequencer recombines them.
# Scramble sequences of length N.
# Make sure that TimeSequencer recombines them.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq1 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq0 = [MockMessage(Time(seconds=t).nanoseconds, random.random())\
for t in range(N)]
seq1 = [MockMessage(Time(seconds=t).nanoseconds, random.random())\
for t in range(N)]
# random.shuffle(seq0)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1)
ts.registerCallback(self.cb_collector_2msg)
Expand All @@ -96,32 +101,31 @@ def test_approx(self):
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

# Scramble sequences of length N of headerless and header-having messages.
# Make sure that TimeSequencer recombines them.
rospy.rostime.set_rostime_initialized(True)
# test headerless scenarios: scramble sequences of length N of
# headerless and header-having messages.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
currentRosTime = ROSClock().now().nanoseconds
seq0 = [MockMessage((currentRosTime + Duration(seconds=t).nanoseconds),
random.random()) for t in range(N)]
seq1 = [MockHeaderlessMessage(random.random()) for t in range(N)]
# random.shuffle(seq0)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1, allow_headerless=True)

ts = ApproximateTimeSynchronizer([m0, m1], N, 10,
allow_headerless=True)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
for msg in random.sample(seq0, N):
m0.signalMessage(msg)
self.assertEqual(self.collector, [])
for i in random.sample(range(N), N):

for i in range(N):
msg = seq1[i]
rospy.rostime._set_rostime(rospy.Time(i+0.05))
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

if __name__ == '__main__':
if 1:
rostest.unitrun('camera_calibration', 'testapproxsync', TestApproxSync)
else:
suite = unittest.TestSuite()
suite.addTest(TestApproxSync('test_approx'))
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestSuite()
suite.addTest(TestApproxSync('test_approx'))
unittest.TextTestRunner(verbosity=2).run(suite)
Loading