From 1db5e98b93b45d99d720d64ca9de612f983ffb18 Mon Sep 17 00:00:00 2001 From: "C. Andy Martin" Date: Fri, 9 Jul 2021 06:23:58 -0900 Subject: [PATCH] fix delays from rospy long-running callbacks Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting (#1901). This can already be somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Also, even with a correct buff_size and a queue_size of 1, the received data may still be the oldest of all data transmitted while the callback was running. Fix the delays in such cases by running callbacks in a separate thread. The receive_loop then calls recv() concurrently with the long running callback, enforcing queue_size as new data is received. This fixes the latency in the data when queue_size is set to be similar to roscpp. This fixes #1901 --- clients/rospy/src/rospy/impl/tcpros_base.py | 45 ++++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/clients/rospy/src/rospy/impl/tcpros_base.py b/clients/rospy/src/rospy/impl/tcpros_base.py index 275790075f..6486faa1a7 100644 --- a/clients/rospy/src/rospy/impl/tcpros_base.py +++ b/clients/rospy/src/rospy/impl/tcpros_base.py @@ -787,6 +787,31 @@ def _reconnect(self): time.sleep(interval) + def callback_loop(self, msgs_callback): + while not self.done and not is_shutdown(): + try: + with self.msg_queue_lock: + # Data that was leftover from reading header may have made + # messages immediately available (such as from a latched + # topic). Go ahead and process anything we already have before + # waiting. + while self.msg_queue: + msg = self.msg_queue.pop(0) + # Be sure to not hold the message queue lock while calling + # the callback, it may take a while. + self.msg_queue_lock.release() + msgs_callback([msg], self) + self.msg_queue_lock.acquire() + self.msg_queue_condition.wait() + except: + # in many cases this will be a normal hangup, but log internally + try: + #1467 sometimes we get exceptions due to + #interpreter shutdown, so blanket ignore those if + #the reporting fails + rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) + except: pass + def receive_loop(self, msgs_callback): """ Receive messages until shutdown @@ -795,13 +820,27 @@ def receive_loop(self, msgs_callback): """ # - use assert here as this would be an internal error, aka bug logger.debug("receive_loop for [%s]", self.name) + # Start a callback thread to process the callbacks. This way the + # receive loop can continue calling recv() while a long-running + # callback is running. try: + self.msg_queue = [] + self.msg_queue_lock = threading.Lock() + self.msg_queue_condition = threading.Condition(self.msg_queue_lock) + callback_thread = threading.Thread( + target=self.callback_loop, + args=(msgs_callback,)) + callback_thread.start() while not self.done and not is_shutdown(): try: if self.socket is not None: msgs = self.receive_once() if not self.done and not is_shutdown(): - msgs_callback(msgs, self) + with self.msg_queue_lock: + self.msg_queue += msgs + if self.protocol.queue_size is not None: + self.msg_queue = self.msg_queue[-self.protocol.queue_size:] + self.msg_queue_condition.notify() else: self._reconnect() @@ -832,7 +871,9 @@ def receive_loop(self, msgs_callback): #the reporting fails rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) except: pass - + with self.msg_queue_lock: + self.msg_queue_condition.notify() + callback_thread.join() rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) finally: if not self.done: