From 2931d3b412d1b11e15ccf39b6c3c6d6c05c42331 Mon Sep 17 00:00:00 2001 From: Ross Biro Date: Fri, 1 Apr 2022 09:00:43 -0400 Subject: [PATCH] Ported melodic speed ups --- .../src/rosserial_python/SerialClient.py | 200 +++++++++++++----- 1 file changed, 150 insertions(+), 50 deletions(-) diff --git a/rosserial_python/src/rosserial_python/SerialClient.py b/rosserial_python/src/rosserial_python/SerialClient.py index 6b1b87574..d09cbc88f 100644 --- a/rosserial_python/src/rosserial_python/SerialClient.py +++ b/rosserial_python/src/rosserial_python/SerialClient.py @@ -44,6 +44,11 @@ import sys import threading import time +import select +import os +import fcntl +import io +#import line_profiler as Profiler from serial import Serial, SerialException, SerialTimeoutException @@ -255,10 +260,31 @@ def listen(self): self.startSerialClient() rospy.loginfo("startSerialClient() exited") + def fileno(self): + return self.socket.fileno() + def startSerialClient(self): client = SerialClient(self) + # profile = Profiler.LineProfiler() try: + # profile.add_function(SerialClient.run) + # profile.add_function(SerialClient.setupPublisher) + # profile.add_function(SerialClient.setupSubscriber) + # profile.add_function(SerialClient.setupServiceServerPublisher) + # profile.add_function(SerialClient.setupServiceClientPublisher) + # profile.add_function(SerialClient.setupServiceServerSubscriber) + # profile.add_function(SerialClient.setupServiceClientSubscriber) + # profile.add_function(SerialClient.handleParameterRequest) + # profile.add_function(SerialClient.handleLoggingRequest) + # profile.add_function(SerialClient.tryRead) + # profile.add_function(SerialClient._tryRead) + + # wrapped_run = profile(lambda: client.run()) + # wrapped_run() + # profile.dump_stats('/tmp/serial.profile') + # profile.print_stats() client.run() + except KeyboardInterrupt: pass except RuntimeError: @@ -294,18 +320,26 @@ def write(self, data): totalsent += self.socket.send(data[totalsent:]) except BrokenPipeError: raise RuntimeError("RosSerialServer.write() socket connection broken") + except io.BlockingIOError as e: + sent = e.characters_written + if sent == 0: + # being a quick hack, we don't turn + # on/off poll flags and handle everything + # in a single threaded loop. + # of course, this will make a bad + # situation worse. + time.sleep(0.001) + continue def read(self, rqsted_length): self.msg = b'' if not self.isConnected: return self.msg - while len(self.msg) < rqsted_length: - chunk = self.socket.recv(rqsted_length - len(self.msg)) - if chunk == b'': - raise RuntimeError("RosSerialServer.read() socket connection broken") - self.msg = self.msg + chunk - return self.msg + chunk = self.socket.recv(rqsted_length) + if chunk == '': + raise RuntimeError("RosSerialServer.read() socket connection broken") + return chunk def inWaiting(self): try: # the caller checks just for <1, so we'll peek at just one byte @@ -328,6 +362,18 @@ class SerialClient(object): protocol_ver2 = b'\xfe' protocol_ver = protocol_ver2 + # Named after the Doctor Who Episode + class Blink(object): + """ + Wake up the poll and queue.get and don't let them sleep anymore. + """ + def __init__(parent): + self.parent = parent + + def invoke(self): + self.parent._pipe_w.write('Wakie, Wakie'.encode()) + self.parent.write_queue.put(self) + def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False): """ Initialize node, connect to bus, attempt to negotiate topics. """ @@ -336,6 +382,7 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal self.write_lock = threading.RLock() self.write_queue = queue.Queue() self.write_thread = None + self.byte_buffer = bytearray() self.lastsync = rospy.Time(0) self.lastsync_lost = rospy.Time(0) @@ -425,28 +472,67 @@ def txStopRequest(self): rospy.loginfo("Sending tx stop request") def tryRead(self, length): + while len(self.byte_buffer) < length: + self.byte_buffer.extend(self._tryRead(length-len(self.byte_buffer), 65536)) + b = self.byte_buffer[0:length] + self.byte_buffer = self.byte_buffer[length:] + return b + + def _tryRead(self, length, chunk): try: read_start = time.time() bytes_remaining = length result = bytearray() while bytes_remaining != 0 and time.time() - read_start < self.timeout: - with self.read_lock: - received = self.port.read(bytes_remaining) + + # Block here until there is something to do. + waittime = self.timeout - (time.time() - read_start) + ready = [] + if waittime <= 0: + break + + # Hacky, but better than busy waiting. + ready = self.poll.poll(waittime * 1000) + + if self.port.fileno() not in [ i[0] for i in ready ]: + break + + try: + with self.read_lock: + received = self.port.read(max(chunk, bytes_remaining)) + except IOError as e: + continue + if len(received) != 0: + #line_profile next target: 511 13812 1297061.0 93.9 10.5 self.last_read = rospy.Time.now() result.extend(received) bytes_remaining -= len(received) + if bytes_remaining < 0: + bytes_remaining = 0 if bytes_remaining != 0: raise IOError("Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining)) - return bytes(result) + return result except Exception as e: raise IOError("Serial Port read failure: %s" % e) def run(self): """ Forward recieved messages to appropriate publisher. """ + # countdown = 1000 for profiling + + # Set Non-Blocking so are read ahead in try_read won't block. + # Means we could get short writes in theory. + flag = fcntl.fcntl(self.port.fileno(), fcntl.F_GETFL) + fcntl.fcntl(self.port.fileno(), fcntl.F_SETFL, flag | os.O_NONBLOCK) + + # Get a pipe object used to wake up the sleepers. + # Be careful, these are not initialized elsewhere. + (self._pipe_r, self._pipe_w) = os.pipe() + rospy.on_shutdown(lambda : SerialClient.Blink(self).invoke()) + # Launch write thread. if self.write_thread is None: self.write_thread = threading.Thread(target=self.processWriteQueue) @@ -456,36 +542,52 @@ def run(self): # Handle reading. data = '' read_step = None - while self.write_thread.is_alive() and not rospy.is_shutdown(): - if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3): - if self.synced: - rospy.logerr("Lost sync with device, restarting...") - else: - rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino") - self.lastsync_lost = rospy.Time.now() - self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC) - self.requestTopics() - self.lastsync = rospy.Time.now() + + self.poll = select.poll() + self.poll.register(self.port, select.POLLIN) + self.poll.register(self._pipe_r, select.POLLIN) + + while True: + # for profiling. + # countdown = countdown - 1 + # if countdown <= 0: + # return + # if countdown % 100 == 0: + # rospy.logerr("countdown=%d" %countdown) + + if len(self.byte_buffer) == 0: + waittime = 3 * self.timeout - (rospy.Time.now() - self.lastsync).to_sec() + ready_fds = [] + if waittime > 0: + ready_fds = self.poll.poll(waittime * 1000) + if not ready_fds: + if self.synced: + rospy.logerr("Lost sync with device, restarting...") + else: + rospy.logerr("Unable to sync with device; possible link problem or linsoftware version mismatch such as hydro rosserial_python with groovy Arduino") + self.lastsync_lost = rospy.Time.now() + self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, + ERROR_NO_SYNC) + self.requestTopics() + self.lastsync = rospy.Time.now() + continue + elif self._pipe_r in [ i[0] for i in ready_fds ]: + break # This try-block is here because we make multiple calls to read(). Any one of them can throw # an IOError if there's a serial problem or timeout. In that scenario, a single handler at the # bottom attempts to reconfigure the topics. try: - with self.read_lock: - if self.port.inWaiting() < 1: - time.sleep(0.001) - continue - # Find sync flag. flag = [0, 0] read_step = 'syncflag' - flag[0] = self.tryRead(1) + flag[0] = bytes(self.tryRead(1)) if (flag[0] != self.header): continue # Find protocol version. read_step = 'protocol' - flag[1] = self.tryRead(1) + flag[1] = bytes(self.tryRead(1)) if flag[1] != self.protocol_ver: self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL) rospy.logerr("Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1])) @@ -528,19 +630,18 @@ def run(self): # Reada checksum for topic id and msg read_step = 'data checksum' - chk = self.tryRead(1) - checksum = sum(array.array('B', topic_id_header + msg + chk)) + chk = bytes(self.tryRead(1)) + checksum = sum(topic_id_header) + sum(msg) + ord(chk) # Validate checksum. if checksum % 256 == 255: self.synced = True self.lastsync_success = rospy.Time.now() try: - self.callbacks[topic_id](msg) + self.callbacks[topic_id](bytes(msg)) except KeyError: rospy.logerr("Tried to publish before configured, topic id %d" % topic_id) self.requestTopics() - time.sleep(0.001) else: rospy.loginfo("wrong checksum for topic id and msg") @@ -775,26 +876,25 @@ def processWriteQueue(self): Main loop for the thread that processes outgoing data to write to the serial port. """ while not rospy.is_shutdown(): - if self.write_queue.empty(): - time.sleep(0.01) - else: - data = self.write_queue.get() - while True: - try: - if isinstance(data, tuple): - topic, msg = data - self._send(topic, msg) - elif isinstance(data, bytes): - self._write(data) - else: - rospy.logerr("Trying to write invalid data type: %s" % type(data)) - break - except SerialTimeoutException as exc: - rospy.logerr('Write timeout: %s' % exc) - time.sleep(1) - except RuntimeError as exc: - rospy.logerr('Write thread exception: %s' % exc) - break + data = self.write_queue.get() + while True: + try: + if isinstance(data, tuple): + topic, msg = data + self._send(topic, msg) + elif isinstance(data, bytes): + self._write(data) + elif isinstance (data, SerialClient.Blink): + break + else: + rospy.logerr("Trying to write invalid data type: %s" % type(data)) + break + except SerialTimeoutException as exc: + rospy.logerr('Write timeout: %s' % exc) + time.sleep(1) + except RuntimeError as exc: + rospy.logerr('Write thread exception: %s' % exc) + break def sendDiagnostics(self, level, msg_text):