Skip to content

Commit

Permalink
fix delays from rospy long-running callbacks
Browse files Browse the repository at this point in the history
Long-running callbacks in rospy can cause extreme amounts of buffering
resulting in unnecessary delay, essentially ignoring the queue_size
setting (ros#1901). This can already be somewhat mitigated by setting
buff_size to be larger than the amount of data that could be buffered
by a long running callback. However, setting buff_size to a correct
value is not possible for the user of the API if the amount of time in
the callback or the amount of data that would be transmitted is
unknown. Also, even with a correct buff_size and a queue_size of 1, the
received data may still be the oldest of all data transmitted while the
callback was running.

Fix the delays in such cases by running callbacks in a separate thread.
The receive_loop then calls recv() concurrently with the long running
callback, enforcing queue_size as new data is received. This fixes the
latency in the data when queue_size is set to be similar to roscpp.

This fixes ros#1901
  • Loading branch information
C. Andy Martin committed Jul 20, 2021
1 parent b0b2d5e commit 14c4be0
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions clients/rospy/src/rospy/impl/tcpros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,31 @@ def _reconnect(self):

time.sleep(interval)

def callback_loop(self, msgs_callback):
while not self.done and not is_shutdown():
try:
with self.msg_queue_lock:
# Data that was leftover from reading header may have made
# messages immediately available (such as from a latched
# topic). Go ahead and process anything we already have before
# waiting.
while self.msg_queue:
msg = self.msg_queue.pop(0)
# Be sure to not hold the message queue lock while calling
# the callback, it may take a while.
self.msg_queue_lock.release()
msgs_callback([msg], self)
self.msg_queue_lock.acquire()
self.msg_queue_condition.wait()
except:
# in many cases this will be a normal hangup, but log internally
try:
#1467 sometimes we get exceptions due to
#interpreter shutdown, so blanket ignore those if
#the reporting fails
rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

def receive_loop(self, msgs_callback):
"""
Receive messages until shutdown
Expand All @@ -804,13 +829,27 @@ def receive_loop(self, msgs_callback):
"""
# - use assert here as this would be an internal error, aka bug
logger.debug("receive_loop for [%s]", self.name)
# Start a callback thread to process the callbacks. This way the
# receive loop can continue calling recv() while a long-running
# callback is running.
try:
self.msg_queue = []
self.msg_queue_lock = threading.Lock()
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
callback_thread = threading.Thread(
target=self.callback_loop,
args=(msgs_callback,))
callback_thread.start()
while not self.done and not is_shutdown():
try:
if self.socket is not None:
msgs = self.receive_once()
if not self.done and not is_shutdown():
msgs_callback(msgs, self)
with self.msg_queue_lock:
self.msg_queue += msgs
if self.protocol.queue_size is not None:
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
self.msg_queue_condition.notify()
else:
self._reconnect()

Expand Down Expand Up @@ -841,7 +880,9 @@ def receive_loop(self, msgs_callback):
#the reporting fails
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

with self.msg_queue_lock:
self.msg_queue_condition.notify()
callback_thread.join()
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
finally:
if not self.done:
Expand Down

0 comments on commit 14c4be0

Please sign in to comment.