Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix delays from rospy long-running callbacks #12

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions clients/rospy/src/rospy/impl/tcpros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,31 @@ def _reconnect(self):

time.sleep(interval)

def callback_loop(self, msgs_callback):
while not self.done and not is_shutdown():
try:
with self.msg_queue_lock:
# Data that was leftover from reading header may have made
# messages immediately available (such as from a latched
# topic). Go ahead and process anything we already have before
# waiting.
while self.msg_queue:
msg = self.msg_queue.pop(0)
# Be sure to not hold the message queue lock while calling
# the callback, it may take a while.
self.msg_queue_lock.release()
msgs_callback([msg], self)
self.msg_queue_lock.acquire()
self.msg_queue_condition.wait()
except:
# in many cases this will be a normal hangup, but log internally
try:
#1467 sometimes we get exceptions due to
#interpreter shutdown, so blanket ignore those if
#the reporting fails
rospydebug("exception in callback loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

def receive_loop(self, msgs_callback):
"""
Receive messages until shutdown
Expand All @@ -795,13 +820,27 @@ def receive_loop(self, msgs_callback):
"""
# - use assert here as this would be an internal error, aka bug
logger.debug("receive_loop for [%s]", self.name)
# Start a callback thread to process the callbacks. This way the
# receive loop can continue calling recv() while a long-running
# callback is running.
try:
self.msg_queue = []
self.msg_queue_lock = threading.Lock()
self.msg_queue_condition = threading.Condition(self.msg_queue_lock)
callback_thread = threading.Thread(
target=self.callback_loop,
args=(msgs_callback,))
callback_thread.start()
while not self.done and not is_shutdown():
try:
if self.socket is not None:
msgs = self.receive_once()
if not self.done and not is_shutdown():
msgs_callback(msgs, self)
with self.msg_queue_lock:
self.msg_queue += msgs
if self.protocol.queue_size is not None:
self.msg_queue = self.msg_queue[-self.protocol.queue_size:]
self.msg_queue_condition.notify()
else:
self._reconnect()

Expand Down Expand Up @@ -832,7 +871,9 @@ def receive_loop(self, msgs_callback):
#the reporting fails
rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
except: pass

with self.msg_queue_lock:
self.msg_queue_condition.notify()
callback_thread.join()
rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
finally:
if not self.done:
Expand Down