Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable message_filters support of python interfaces and tests #7

Merged
merged 13 commits into from
Sep 26, 2018
Merged
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ if(BUILD_TESTING)
target_link_libraries(${PROJECT_NAME}-test_fuzz ${PROJECT_NAME})
endif()

# python tests with python interfaces of message filters
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(directed.py "test/directed.py")
ament_add_pytest_test(test_approxsync.py "test/test_approxsync.py")
ament_add_pytest_test(test_message_filters_cache.py "test/test_message_filters_cache.py")
endif()

ament_package()
85 changes: 53 additions & 32 deletions src/message_filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@
======================
"""

from functools import reduce
import itertools
import threading
import rospy
import rclpy

from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.logging import LoggingSeverity
from rclpy.time import Time


class SimpleFilter(object):
Expand All @@ -44,8 +50,8 @@ def registerCallback(self, cb, *args):
"""
Register a callback function `cb` to be called when this filter
has output.
The filter calls the function ``cb`` with a filter-dependent list of arguments,
followed by the call-supplied arguments ``args``.
The filter calls the function ``cb`` with a filter-dependent
list of arguments,followed by the call-supplied arguments ``args``.
"""

conn = len(self.callbacks)
Expand All @@ -59,17 +65,18 @@ def signalMessage(self, *msg):
class Subscriber(SimpleFilter):

"""
ROS subscription filter. Identical arguments as :class:`rospy.Subscriber`.
ROS2 subscription filter,Identical arguments as :class:`rclpy.Subscriber`.

This class acts as a highest-level filter, simply passing messages
from a ROS subscription through to the filters which have connected
from a ROS2 subscription through to the filters which have connected
to it.
"""
def __init__(self, *args, **kwargs):
SimpleFilter.__init__(self)
self.topic = args[0]
self.node = args[0]
self.topic = args[2]
kwargs['callback'] = self.callback
self.sub = rospy.Subscriber(*args, **kwargs)
self.sub = self.node.create_subscription(*args[1:], **kwargs)

def callback(self, msg):
self.signalMessage(msg)
Expand Down Expand Up @@ -113,11 +120,18 @@ def connectInput(self, f):
def add(self, msg):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
if not self.allow_headerless:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
msg_filters_logger = rclpy.logging.get_logger('message_filters_cache')
msg_filters_logger.set_level(LoggingSeverity.INFO)
msg_filters_logger.warn("can not use message filters messages "
"without timestamp infomation when "
"'allow_headerless' is disabled. "
"auto assign ROSTIME to headerless "
"messages once enabling constructor "
"option of 'allow_headerless'.")

return
stamp = rospy.Time.now()

stamp = ROSClock().now()
else:
stamp = msg.header.stamp

Expand Down Expand Up @@ -180,11 +194,11 @@ class TimeSynchronizer(SimpleFilter):
Synchronizes messages by their timestamps.

:class:`TimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. TimeSynchronizer
listens on multiple input message filters ``fs``, and invokes the callback
when it has a collection of messages with matching timestamps.
timestamps contained in their messages' headers. TimeSynchronizer listens
on multiple input message filters ``fs``, and invokes the callback when
it has a collection of messages with matching timestamps.

The signature of the callback function is::
The signature of the callback function is:

def callback(msg1, ... msgN):

Expand Down Expand Up @@ -227,32 +241,38 @@ class ApproximateTimeSynchronizer(TimeSynchronizer):
"""
Approximately synchronizes messages by their timestamps.

:class:`ApproximateTimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. The API is the same as TimeSynchronizer
except for an extra `slop` parameter in the constructor that defines the delay (in seconds)
with which messages can be synchronized. The ``allow_headerless`` option specifies whether
to allow storing headerless messages with current ROS time instead of timestamp. You should
:class:`ApproximateTimeSynchronizer` synchronizes incoming message filters
by the timestamps contained in their messages' headers. The API is the same
as TimeSynchronizer except for an extra `slop` parameter in the constructor
that defines the delay (in seconds) with which messages can be synchronized.
The ``allow_headerless`` option specifies whether to allow storing
headerless messages with current ROS time instead of timestamp. You should
avoid this as much as you can, since the delays are unpredictable.
"""

def __init__(self, fs, queue_size, slop, allow_headerless=False):
TimeSynchronizer.__init__(self, fs, queue_size)
self.slop = rospy.Duration.from_sec(slop)
self.slop = Duration(seconds=slop)
self.allow_headerless = allow_headerless

def add(self, msg, my_queue, my_queue_index=None):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
if not self.allow_headerless:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
msg_filters_logger = rclpy.logging.get_logger('message_filters_approx')
msg_filters_logger.set_level(LoggingSeverity.INFO)
msg_filters_logger.warn("can not use message filters for "
"messages without timestamp infomation when "
"'allow_headerless' is disabled. auto assign "
"ROSTIME to headerless messages once enabling "
"constructor option of 'allow_headerless'.")
return
stamp = rospy.Time.now()

stamp = ROSClock().now()
else:
stamp = msg.header.stamp

self.lock.acquire()
my_queue[stamp] = msg
my_queue[stamp.nanoseconds] = msg
while len(my_queue) > self.queue_size:
del my_queue[min(my_queue)]
# self.queues = [topic_0 {stamp: msg}, topic_1 {stamp: msg}, ...]
Expand All @@ -266,26 +286,27 @@ def add(self, msg, my_queue, my_queue_index=None):
for queue in search_queues:
topic_stamps = []
for s in queue:
stamp_delta = abs(s - stamp)
stamp_delta = Duration(nanoseconds=abs(s - stamp.nanoseconds))
if stamp_delta > self.slop:
continue # far over the slop
topic_stamps.append((s, stamp_delta))
topic_stamps.append(((Time(nanoseconds=s,
clock_type=stamp.clock_type)), stamp_delta))
if not topic_stamps:
self.lock.release()
return
topic_stamps = sorted(topic_stamps, key=lambda x: x[1])
stamps.append(topic_stamps)
for vv in itertools.product(*[zip(*s)[0] for s in stamps]):
for vv in itertools.product(*[list(zip(*s))[0] for s in stamps]):
tfoote marked this conversation as resolved.
Show resolved Hide resolved
vv = list(vv)
# insert the new message
if my_queue_index is not None:
vv.insert(my_queue_index, stamp)
qt = list(zip(self.queues, vv))
if ( ((max(vv) - min(vv)) < self.slop) and
(len([1 for q,t in qt if t not in q]) == 0) ):
msgs = [q[t] for q,t in qt]
(len([1 for q,t in qt if t.nanoseconds not in q]) == 0) ):
msgs = [q[t.nanoseconds] for q,t in qt]
self.signalMessage(*msgs)
for q,t in qt:
del q[t]
del q[t.nanoseconds]
break # fast finish after the synchronization
self.lock.release()
15 changes: 6 additions & 9 deletions test/directed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
#
# wide.registerCallback(boost::bind(&PersonDataRecorder::wideCB, this, _1, _2, _3, _4));

import rostest
import rospy
import rclpy
import random
import unittest

Expand Down Expand Up @@ -55,7 +54,8 @@ def test_synchronizer(self):
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])

# Scramble sequences of length N. Make sure that TimeSequencer recombines them.
# Scramble sequences of length N.
# Make sure that TimeSequencer recombines them.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
Expand All @@ -74,9 +74,6 @@ def test_synchronizer(self):
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

if __name__ == '__main__':
if 0:
rostest.unitrun('message_filters', 'directed', TestDirected)
else:
suite = unittest.TestSuite()
suite.addTest(TestDirected('test_synchronizer'))
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestSuite()
suite.addTest(TestDirected('test_synchronizer'))
unittest.TextTestRunner(verbosity=2).run(suite)
69 changes: 36 additions & 33 deletions test/test_approxsync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
# Software License Agreement (BSD License)
#
Expand Down Expand Up @@ -32,14 +32,16 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import rostest
import rospy
import unittest
import random

import message_filters
from message_filters import ApproximateTimeSynchronizer

import random
import rclpy
from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.time import Time
import unittest

class MockHeader:
pass

Expand All @@ -62,29 +64,32 @@ def cb_collector_2msg(self, msg1, msg2):
self.collector.append((msg1, msg2))

def test_approx(self):
# Simple case, pairs of messages,
# make sure that they get combined
m0 = MockFilter()
m1 = MockFilter()
ts = ApproximateTimeSynchronizer([m0, m1], 1, 0.1)
ts.registerCallback(self.cb_collector_2msg)

if 0:
# Simple case, pairs of messages, make sure that they get combined
for t in range(10):
self.collector = []
msg0 = MockMessage(t, 33)
msg1 = MockMessage(t, 34)
m0.signalMessage(msg0)
self.assertEqual(self.collector, [])
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])
for t in range(10):
self.collector = []
msg0 = MockMessage(Time(seconds=t), 33)
msg1 = MockMessage(Time(seconds=t), 34)
m0.signalMessage(msg0)
self.assertEqual(self.collector, [])
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])

# Scramble sequences of length N. Make sure that TimeSequencer recombines them.
# Scramble sequences of length N.
# Make sure that TimeSequencer recombines them.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq1 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq0 = [MockMessage(Time(seconds=t), random.random())\
for t in range(N)]
seq1 = [MockMessage(Time(seconds=t), random.random())\
for t in range(N)]
# random.shuffle(seq0)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1)
ts.registerCallback(self.cb_collector_2msg)
Expand All @@ -96,32 +101,30 @@ def test_approx(self):
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

# Scramble sequences of length N of headerless and header-having messages.
# Make sure that TimeSequencer recombines them.
rospy.rostime.set_rostime_initialized(True)
# test headerless scenarios: scramble sequences of length N of
# headerless and header-having messages.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq0 = [MockMessage((ROSClock().now() + Duration(seconds=t)),
random.random()) for t in range(N)]
seq1 = [MockHeaderlessMessage(random.random()) for t in range(N)]
# random.shuffle(seq0)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1, allow_headerless=True)

ts = ApproximateTimeSynchronizer([m0, m1], N, 10,
allow_headerless=True)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
for msg in random.sample(seq0, N):
m0.signalMessage(msg)
self.assertEqual(self.collector, [])
for i in random.sample(range(N), N):

for i in range(N):
msg = seq1[i]
rospy.rostime._set_rostime(rospy.Time(i+0.05))
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

if __name__ == '__main__':
if 1:
rostest.unitrun('camera_calibration', 'testapproxsync', TestApproxSync)
else:
suite = unittest.TestSuite()
suite.addTest(TestApproxSync('test_approx'))
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestSuite()
suite.addTest(TestApproxSync('test_approx'))
unittest.TextTestRunner(verbosity=2).run(suite)
Loading