diff --git a/clients/rospy/src/rospy/impl/tcpros_base.py b/clients/rospy/src/rospy/impl/tcpros_base.py index 275790075f..5f521d08c4 100644 --- a/clients/rospy/src/rospy/impl/tcpros_base.py +++ b/clients/rospy/src/rospy/impl/tcpros_base.py @@ -42,6 +42,7 @@ from io import StringIO, BytesIO #Python 3.x python3 = 1 import socket +import errno import logging import threading @@ -88,7 +89,7 @@ def _is_use_tcp_keepalive(): _use_tcp_keepalive = val if code == 1 else True return _use_tcp_keepalive -def recv_buff(sock, b, buff_size): +def recv_buff(sock, b, buff_size, block=True): """ Read data from socket into buffer. @param sock: socket to read from @@ -97,15 +98,44 @@ def recv_buff(sock, b, buff_size): @type b: StringIO @param buff_size: recv read size @type buff_size: int + @param block: whether to block on first recv + @type block: bool @return: number of bytes read @rtype: int """ - d = sock.recv(buff_size) - if d: - b.write(d) - return len(d) - else: #bomb out - raise TransportTerminated("unable to receive data from sender, check sender's logs for details") + # Read all data available on the socket. + # read_messages will enforce the queue_size + # Block on the first read if block is set, then non-block on the rest to + # read whatever is available. + if block: + sock.setblocking(True) + else: + sock.setblocking(False) + bytes_received = 0 + try: + while not is_shutdown(): + d = sock.recv(buff_size) + if d: + b.write(d) + bytes_received += len(d) + else: + if bytes_received or not block: + # Either we have received bytes and a subsequent recv + # returned socket closed, or we were not blocking. + break + else: + # No bytes received and blocking + raise TransportTerminated("unable to receive data from sender, check sender's logs for details") + sock.setblocking(False) + except socket.timeout: + pass + except (OSError, socket.error) as e: + # Handle blocking socket errors + if e.args[0] == errno.EAGAIN or e.args[0] == errno.EWOULDBLOCK: + pass + else: + raise e + return bytes_received class TCPServer(object): """ @@ -720,9 +750,12 @@ def write_data(self, data): raise TransportTerminated(str(errno)+' '+msg) return True - def receive_once(self): + def receive_once(self, block=True): """ - block until messages are read off of socket + block until at least one message is read off socket + or read all available data off of socket (if block is False) + @param block whether to block and read at least one message, or to just + read what is available on the socket @return: list of newly received messages @rtype: [Msg] @raise TransportException: if unable to receive message due to error @@ -734,12 +767,20 @@ def receive_once(self): msg_queue = [] p = self.protocol try: - sock.setblocking(1) while not msg_queue and not self.done and not is_shutdown(): + bytes_received = recv_buff(sock, b, p.buff_size, block) + self.stat_bytes += bytes_received if b.tell() >= 4: - p.read_messages(b, msg_queue, sock) - if not msg_queue: - self.stat_bytes += recv_buff(sock, b, p.buff_size) + p.read_messages(b, msg_queue, sock) + # The caller may just be checking if there is more data between + # processing messages already queued for callbacks. In that + # case it is normal for there to be no data and we are not + # supposed to wait for it, so break the loop with no messages. + # Only do this after calling read_messages, as there may be + # leftover data from after reading the header in the buffer + # that needs to be deserialized. + if not block and not bytes_received: + break self.stat_num_msg += len(msg_queue) #STATS # set the _connection_header field for m in msg_queue: @@ -795,14 +836,30 @@ def receive_loop(self, msgs_callback): """ # - use assert here as this would be an internal error, aka bug logger.debug("receive_loop for [%s]", self.name) + msgs = [] try: + # On first call after reading header, there may be messages in the + # leftover data that was read. Go ahead and read any leftover messages + # now non-blocking. If we did not, the receive_once below will call + # blocking since the loop has no messages queued, but we might only + # ever get one latched message that is in the leftovers, meaning we + # would block forever. + msgs += self.receive_once(False) while not self.done and not is_shutdown(): try: if self.socket is not None: - msgs = self.receive_once() - if not self.done and not is_shutdown(): - msgs_callback(msgs, self) + # Only block if there are no msgs left to process + msgs += self.receive_once(len(msgs) == 0) + # Throw away any unprocessed messages before queue_size + # We only process one at a time to give us an + # opportunity to stay caught up on recv'ing data. + # Delays in recv can cause the queue to have stale data. + if self.protocol.queue_size is not None: + msgs = msgs[-self.protocol.queue_size:] + if not self.done and not is_shutdown() and len(msgs) > 0: + msgs_callback([msgs.pop(0)], self) else: + msgs = [] self._reconnect() except TransportException as e: