Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mitigate delays from rospy long-running callbacks (#1901) #11

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 73 additions & 16 deletions clients/rospy/src/rospy/impl/tcpros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from io import StringIO, BytesIO #Python 3.x
python3 = 1
import socket
import errno
import logging

import threading
Expand Down Expand Up @@ -88,7 +89,7 @@ def _is_use_tcp_keepalive():
_use_tcp_keepalive = val if code == 1 else True
return _use_tcp_keepalive

def recv_buff(sock, b, buff_size):
def recv_buff(sock, b, buff_size, block=True):
"""
Read data from socket into buffer.
@param sock: socket to read from
Expand All @@ -97,15 +98,44 @@ def recv_buff(sock, b, buff_size):
@type b: StringIO
@param buff_size: recv read size
@type buff_size: int
@param block: whether to block on first recv
@type block: bool
@return: number of bytes read
@rtype: int
"""
d = sock.recv(buff_size)
if d:
b.write(d)
return len(d)
else: #bomb out
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
# Read all data available on the socket.
# read_messages will enforce the queue_size
# Block on the first read if block is set, then non-block on the rest to
# read whatever is available.
if block:
sock.setblocking(True)
else:
sock.setblocking(False)
bytes_received = 0
try:
while not is_shutdown():
d = sock.recv(buff_size)
if d:
b.write(d)
bytes_received += len(d)
else:
if bytes_received or not block:
# Either we have received bytes and a subsequent recv
# returned socket closed, or we were not blocking.
break
else:
# No bytes received and blocking
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
sock.setblocking(False)
except socket.timeout:
pass
except (OSError, socket.error) as e:
# Handle blocking socket errors
if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK:
pass
else:
raise e
return bytes_received

class TCPServer(object):
"""
Expand Down Expand Up @@ -720,9 +750,12 @@ def write_data(self, data):
raise TransportTerminated(str(errno)+' '+msg)
return True

def receive_once(self):
def receive_once(self, block=True):
"""
block until messages are read off of socket
block until at least one message is read off socket
or read all available data off of socket (if block is False)
@param block whether to block and read at least one message, or to just
read what is available on the socket
@return: list of newly received messages
@rtype: [Msg]
@raise TransportException: if unable to receive message due to error
Expand All @@ -734,12 +767,20 @@ def receive_once(self):
msg_queue = []
p = self.protocol
try:
sock.setblocking(1)
while not msg_queue and not self.done and not is_shutdown():
bytes_received = recv_buff(sock, b, p.buff_size, block)
self.stat_bytes += bytes_received
if b.tell() >= 4:
p.read_messages(b, msg_queue, sock)
if not msg_queue:
self.stat_bytes += recv_buff(sock, b, p.buff_size)
p.read_messages(b, msg_queue, sock)
# The caller may just be checking if there is more data between
# processing messages already queued for callbacks. In that
# case it is normal for there to be no data and we are not
# supposed to wait for it, so break the loop with no messages.
# Only do this after calling read_messages, as there may be
# leftover data from after reading the header in the buffer
# that needs to be deserialized.
if not block and not bytes_received:
break
self.stat_num_msg += len(msg_queue) #STATS
# set the _connection_header field
for m in msg_queue:
Expand Down Expand Up @@ -795,14 +836,30 @@ def receive_loop(self, msgs_callback):
"""
# - use assert here as this would be an internal error, aka bug
logger.debug("receive_loop for [%s]", self.name)
msgs = []
try:
# On first call after reading header, there may be messages in the
# leftover data that was read. Go ahead and read any leftover messages
# now non-blocking. If we did not, the receive_once below will call
# blocking since the loop has no messages queued, but we might only
# ever get one latched message that is in the leftovers, meaning we
# would block forever.
msgs += self.receive_once(False)
while not self.done and not is_shutdown():
try:
if self.socket is not None:
msgs = self.receive_once()
if not self.done and not is_shutdown():
msgs_callback(msgs, self)
# Only block if there are no msgs left to process
msgs += self.receive_once(len(msgs) == 0)
# Throw away any unprocessed messages before queue_size
# We only process one at a time to give us an
# opportunity to stay caught up on recv'ing data.
# Delays in recv can cause the queue to have stale data.
if self.protocol.queue_size is not None:
msgs = msgs[-self.protocol.queue_size:]
if not self.done and not is_shutdown() and len(msgs) > 0:
msgs_callback([msgs.pop(0)], self)
else:
msgs = []
self._reconnect()

except TransportException as e:
Expand Down