From 3174ca2c1cd0e0c1d867d854d9ed8af2a16d3a71 Mon Sep 17 00:00:00 2001 From: "C. Andy Martin" Date: Fri, 9 Jul 2021 06:23:58 -0900 Subject: [PATCH] mitigate delays from rospy long-running callbacks (#1901) Long-running callbacks in rospy can cause extreme amounts of buffering resulting in unnecessary delay, essentially ignoring the queue_size setting. This can already by somewhat mitigated by setting buff_size to be larger than the amount of data that could be buffered by a long running callback. However, setting buff_size to a correct value is not possible for the user of the API if the amount of time in the callback or the amount of data that would be transmitted is unknown. Greatly mitigate the delays in such cases by altering the structure of the receive logic. Instead of recv()ing up to buff_size data, then calling the callbacks on every message received, interleave calling recv() between each callback, enforcing queue_size as we go. Also, recv() all data currently available when calling recv() by calling recv() non-blocking after calling it blocking. While it is still possible to have stale data, even with a queue_size of 1, it is less likely, especially if the publisher of the data is on the same host. Even if not, the staleness of the data with a queue_size of 1 is now bounded by the runtime of the callback instead of by buff_size. This mitigation was chosen over a complete fix to the problem because a complete fix would involve a new thread to handle callbacks. While a new thread would allow recv() to be running all the time, even during the long callback, it is a more complex solution. Since rospy is going to be replaced in ROS2, this more tactical mitigation seems appropriate. This mitigates #1901 --- clients/rospy/src/rospy/impl/tcpros_base.py | 89 +++++++++++++++++---- 1 file changed, 73 insertions(+), 16 deletions(-) diff --git a/clients/rospy/src/rospy/impl/tcpros_base.py b/clients/rospy/src/rospy/impl/tcpros_base.py index 275790075f..5f521d08c4 100644 --- a/clients/rospy/src/rospy/impl/tcpros_base.py +++ b/clients/rospy/src/rospy/impl/tcpros_base.py @@ -42,6 +42,7 @@ from io import StringIO, BytesIO #Python 3.x python3 = 1 import socket +import errno import logging import threading @@ -88,7 +89,7 @@ def _is_use_tcp_keepalive(): _use_tcp_keepalive = val if code == 1 else True return _use_tcp_keepalive -def recv_buff(sock, b, buff_size): +def recv_buff(sock, b, buff_size, block=True): """ Read data from socket into buffer. @param sock: socket to read from @@ -97,15 +98,44 @@ def recv_buff(sock, b, buff_size): @type b: StringIO @param buff_size: recv read size @type buff_size: int + @param block: whether to block on first recv + @type block: bool @return: number of bytes read @rtype: int """ - d = sock.recv(buff_size) - if d: - b.write(d) - return len(d) - else: #bomb out - raise TransportTerminated("unable to receive data from sender, check sender's logs for details") + # Read all data available on the socket. + # read_messages will enforce the queue_size + # Block on the first read if block is set, then non-block on the rest to + # read whatever is available. + if block: + sock.setblocking(True) + else: + sock.setblocking(False) + bytes_received = 0 + try: + while not is_shutdown(): + d = sock.recv(buff_size) + if d: + b.write(d) + bytes_received += len(d) + else: + if bytes_received or not block: + # Either we have received bytes and a subsequent recv + # returned socket closed, or we were not blocking. + break + else: + # No bytes received and blocking + raise TransportTerminated("unable to receive data from sender, check sender's logs for details") + sock.setblocking(False) + except socket.timeout: + pass + except (OSError, socket.error) as e: + # Handle blocking socket errors + if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK: + pass + else: + raise e + return bytes_received class TCPServer(object): """ @@ -720,9 +750,12 @@ def write_data(self, data): raise TransportTerminated(str(errno)+' '+msg) return True - def receive_once(self): + def receive_once(self, block=True): """ - block until messages are read off of socket + block until at least one message is read off socket + or read all available data off of socket (if block is False) + @param block whether to block and read at least one message, or to just + read what is available on the socket @return: list of newly received messages @rtype: [Msg] @raise TransportException: if unable to receive message due to error @@ -734,12 +767,20 @@ def receive_once(self): msg_queue = [] p = self.protocol try: - sock.setblocking(1) while not msg_queue and not self.done and not is_shutdown(): + bytes_received = recv_buff(sock, b, p.buff_size, block) + self.stat_bytes += bytes_received if b.tell() >= 4: - p.read_messages(b, msg_queue, sock) - if not msg_queue: - self.stat_bytes += recv_buff(sock, b, p.buff_size) + p.read_messages(b, msg_queue, sock) + # The caller may just be checking if there is more data between + # processing messages already queued for callbacks. In that + # case it is normal for there to be no data and we are not + # supposed to wait for it, so break the loop with no messages. + # Only do this after calling read_messages, as there may be + # leftover data from after reading the header in the buffer + # that needs to be deserialized. + if not block and not bytes_received: + break self.stat_num_msg += len(msg_queue) #STATS # set the _connection_header field for m in msg_queue: @@ -795,14 +836,30 @@ def receive_loop(self, msgs_callback): """ # - use assert here as this would be an internal error, aka bug logger.debug("receive_loop for [%s]", self.name) + msgs = [] try: + # On first call after reading header, there may be messages in the + # leftover data that was read. Go ahead and read any leftover messages + # now non-blocking. If we did not, the receive_once below will call + # blocking since the loop has no messages queued, but we might only + # ever get one latched message that is in the leftovers, meaning we + # would block forever. + msgs += self.receive_once(False) while not self.done and not is_shutdown(): try: if self.socket is not None: - msgs = self.receive_once() - if not self.done and not is_shutdown(): - msgs_callback(msgs, self) + # Only block if there are no msgs left to process + msgs += self.receive_once(len(msgs) == 0) + # Throw away any unprocessed messages before queue_size + # We only process one at a time to give us an + # opportunity to stay caught up on recv'ing data. + # Delays in recv can cause the queue to have stale data. + if self.protocol.queue_size is not None: + msgs = msgs[-self.protocol.queue_size:] + if not self.done and not is_shutdown() and len(msgs) > 0: + msgs_callback([msgs.pop(0)], self) else: + msgs = [] self._reconnect() except TransportException as e: