diff --git a/ros/src/rosserial_python/src/rosserial_python/SerialClient.py b/ros/src/rosserial_python/src/rosserial_python/SerialClient.py index 180dcef..b82993a 100644 --- a/ros/src/rosserial_python/src/rosserial_python/SerialClient.py +++ b/ros/src/rosserial_python/src/rosserial_python/SerialClient.py @@ -61,38 +61,43 @@ ERROR_NO_SYNC = "no sync with device" ERROR_PACKET_FAILED = "Packet Failed : Failed to read msg data" + def load_pkg_module(package, directory): - #check if its in the python path + # check if its in the python path path = sys.path try: imp.find_module(package) except ImportError: roslib.load_manifest(package) try: - m = __import__( package + '.' + directory ) + m = __import__(package + '.' + directory) except ImportError: - rospy.logerr( "Cannot import package : %s"% package ) - rospy.logerr( "sys.path was " + str(path) ) + rospy.logerr("Cannot import package : %s" % package) + rospy.logerr("sys.path was " + str(path)) return None return m + def load_message(package, message): m = load_pkg_module(package, 'msg') m2 = getattr(m, 'msg') return getattr(m2, message) -def load_service(package,service): + +def load_service(package, service): s = load_pkg_module(package, 'srv') s = getattr(s, 'srv') srv = getattr(s, service) mreq = getattr(s, service+"Request") mres = getattr(s, service+"Response") - return srv,mreq,mres + return srv, mreq, mres + class Publisher: """ Publisher forwards messages from the serial device to ROS. """ + def __init__(self, topic_info): """ Create a new publisher. """ self.topic = topic_info.topic_name @@ -103,7 +108,8 @@ def __init__(self, topic_info): if self.message._md5sum == topic_info.md5sum: self.publisher = rospy.Publisher(self.topic, self.message, queue_size=10) else: - raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum) + raise Exception('Checksum does not match: ' + + self.message._md5sum + ',' + topic_info.md5sum) def handlePacket(self, data): """ Forward message to ROS network. """ @@ -128,7 +134,8 @@ def __init__(self, topic_info, parent): if self.message._md5sum == topic_info.md5sum: self.subscriber = rospy.Subscriber(self.topic, self.message, self.callback) else: - raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum) + raise Exception('Checksum does not match: ' + + self.message._md5sum + ',' + topic_info.md5sum) def callback(self, msg): """ Forward message to serial device. """ @@ -140,6 +147,7 @@ def unregister(self): rospy.loginfo("Removing subscriber: %s", self.topic) self.subscriber.unregister() + class ServiceServer: """ ServiceServer responds to requests from ROS. @@ -213,6 +221,7 @@ def handlePacket(self, data): resp.serialize(data_buffer) self.parent.send(self.id, data_buffer.getvalue()) + class RosSerialServer: """ RosSerialServer waits for a socket connection then passes itself, forked as a @@ -220,6 +229,7 @@ class RosSerialServer: for additional connections. Each forked process is a new ros node, and proxies ros operations (e.g. publish/subscribe) from its connection to the rest of ros. """ + def __init__(self, tcp_portnum, fork_server=False): print "Fork_server is: ", fork_server self.tcp_portnum = tcp_portnum @@ -228,21 +238,21 @@ def __init__(self, tcp_portnum, fork_server=False): def listen(self): self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - #bind the socket to a public host, and a well-known port - self.serversocket.bind(("", self.tcp_portnum)) #become a server socket + # bind the socket to a public host, and a well-known port + self.serversocket.bind(("", self.tcp_portnum)) # become a server socket self.serversocket.listen(1) while True: - #accept connections + # accept connections print "waiting for socket connection" (clientsocket, address) = self.serversocket.accept() - #now do something with the clientsocket + # now do something with the clientsocket rospy.loginfo("Established a socket connection from %s on port %s" % (address)) self.socket = clientsocket self.isConnected = True - if self.fork_server: # if configured to launch server in a separate process + if self.fork_server: # if configured to launch server in a separate process rospy.loginfo("Forking a socket server process") process = multiprocessing.Process(target=self.startSocketServer, args=(address)) process.daemon = True @@ -271,7 +281,7 @@ def startSerialClient(self): sub.unregister() for srv in client.services.values(): srv.unregister() - #pass + # pass def startSocketServer(self, port, address): rospy.loginfo("starting ROS Serial Python Node serial_node-%r" % (address,)) @@ -281,6 +291,9 @@ def startSocketServer(self, port, address): def flushInput(self): pass + def flushOutput(self): + pass + def write(self, data): if not self.isConnected: return @@ -306,8 +319,8 @@ def read(self, rqsted_length): return self.msg def inWaiting(self): - try: # the caller checks just for <1, so we'll peek at just one byte - chunk = self.socket.recv(1, socket.MSG_DONTWAIT|socket.MSG_PEEK) + try: # the caller checks just for <1, so we'll peek at just one byte + chunk = self.socket.recv(1, socket.MSG_DONTWAIT | socket.MSG_PEEK) if chunk == '': raise RuntimeError("RosSerialServer.inWaiting() socket connection broken") return len(chunk) @@ -316,6 +329,7 @@ def inWaiting(self): return 0 raise + class SerialClient(object): """ ServiceServer responds to requests from the serial device. @@ -339,21 +353,23 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal self.synced = False self.fix_pyserial_for_test = fix_pyserial_for_test - self.pub_diagnostics = rospy.Publisher('/diagnostics', diagnostic_msgs.msg.DiagnosticArray, queue_size=10) + self.pub_diagnostics = rospy.Publisher( + '/diagnostics', diagnostic_msgs.msg.DiagnosticArray, queue_size=10) if port is None: # no port specified, listen for any new port? pass elif hasattr(port, 'read'): - #assume its a filelike object - self.port=port + # assume its a filelike object + self.port = port else: # open a specific port while not rospy.is_shutdown(): try: if self.fix_pyserial_for_test: # see https://github.com/pyserial/pyserial/issues/59 - self.port = Serial(port, baud, timeout=self.timeout, write_timeout=10, rtscts=True, dsrdtr=True) + self.port = Serial(port, baud, timeout=self.timeout, + write_timeout=10, rtscts=True, dsrdtr=True) else: self.port = Serial(port, baud, timeout=self.timeout, write_timeout=10) break @@ -373,7 +389,7 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal self.protocol_ver = self.protocol_ver2 self.publishers = dict() # id:Publishers - self.subscribers = dict() # topic:Subscriber + self.subscribers = dict() # topic:Subscriber self.services = dict() # topic:Service self.buffer_out = -1 @@ -384,10 +400,14 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal self.callbacks[TopicInfo.ID_PUBLISHER] = self.setupPublisher self.callbacks[TopicInfo.ID_SUBSCRIBER] = self.setupSubscriber # service client/servers have 2 creation endpoints (a publisher and a subscriber) - self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_PUBLISHER] = self.setupServiceServerPublisher - self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_SUBSCRIBER] = self.setupServiceServerSubscriber - self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_PUBLISHER] = self.setupServiceClientPublisher - self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_SUBSCRIBER] = self.setupServiceClientSubscriber + self.callbacks[TopicInfo.ID_SERVICE_SERVER + + TopicInfo.ID_PUBLISHER] = self.setupServiceServerPublisher + self.callbacks[TopicInfo.ID_SERVICE_SERVER + + TopicInfo.ID_SUBSCRIBER] = self.setupServiceServerSubscriber + self.callbacks[TopicInfo.ID_SERVICE_CLIENT + + TopicInfo.ID_PUBLISHER] = self.setupServiceClientPublisher + self.callbacks[TopicInfo.ID_SERVICE_CLIENT + + TopicInfo.ID_SUBSCRIBER] = self.setupServiceClientSubscriber # custom endpoints self.callbacks[TopicInfo.ID_PARAMETER_REQUEST] = self.handleParameterRequest self.callbacks[TopicInfo.ID_LOG] = self.handleLoggingRequest @@ -402,7 +422,7 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal def requestTopics(self): """ Determine topics to subscribe/publish. """ rospy.loginfo('Requesting topics...') - + # TODO remove if possible if not self.fix_pyserial_for_test: with self.read_lock: @@ -437,7 +457,8 @@ def tryRead(self, length): bytes_remaining -= len(received) if bytes_remaining != 0: - raise IOError("Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining)) + raise IOError("Returned short (expected %d bytes, received %d instead)." % + (length, length - bytes_remaining)) return bytes(result) except Exception as e: @@ -460,7 +481,8 @@ def run(self): if self.synced: rospy.logerr("Lost sync with device, restarting...") else: - rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino") + rospy.logerr( + "Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino") self.lastsync_lost = rospy.Time.now() self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC) self.requestTopics() @@ -486,14 +508,19 @@ def run(self): read_step = 'protocol' flag[1] = self.tryRead(1) if flag[1] != self.protocol_ver: - self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL) - rospy.logerr("Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1])) - protocol_ver_msgs = {'\xff': 'Rev 0 (rosserial 0.4 and earlier)', '\xfe': 'Rev 1 (rosserial 0.5+)', '\xfd': 'Some future rosserial version'} + self.sendDiagnostics( + diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL) + rospy.logerr( + "Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1])) + protocol_ver_msgs = { + '\xff': 'Rev 0 (rosserial 0.4 and earlier)', '\xfe': 'Rev 1 (rosserial 0.5+)', '\xfd': 'Some future rosserial version'} if flag[1] in protocol_ver_msgs: - found_ver_msg = 'Protocol version of client is ' + protocol_ver_msgs[flag[1]] + found_ver_msg = 'Protocol version of client is ' + \ + protocol_ver_msgs[flag[1]] else: found_ver_msg = "Protocol version of client is unrecognized" - rospy.loginfo("%s, expected %s" % (found_ver_msg, protocol_ver_msgs[self.protocol_ver])) + rospy.loginfo("%s, expected %s" % + (found_ver_msg, protocol_ver_msgs[self.protocol_ver])) continue # Read message length. @@ -508,7 +535,7 @@ def run(self): # Validate message length checksum. if msg_len_checksum % 256 != 255: - rospy.loginfo("wrong checksum for msg length, length %d" %(msg_length)) + rospy.loginfo("wrong checksum for msg length, length %d" % (msg_length)) rospy.loginfo("chk is %d" % ord(msg_len_chk)) continue @@ -522,7 +549,8 @@ def run(self): try: msg = self.tryRead(msg_length) except IOError: - self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_PACKET_FAILED) + self.sendDiagnostics( + diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_PACKET_FAILED) rospy.loginfo("Packet Failed : Failed to read msg data") rospy.loginfo("expected msg length is %d", msg_length) raise @@ -530,7 +558,7 @@ def run(self): # Reada checksum for topic id and msg read_step = 'data checksum' chk = self.tryRead(1) - checksum = sum(map(ord, topic_id_header) ) + sum(map(ord, msg)) + ord(chk) + checksum = sum(map(ord, topic_id_header)) + sum(map(ord, msg)) + ord(chk) # Validate checksum. if checksum % 256 == 255: @@ -575,7 +603,7 @@ def setupPublisher(self, data): self.publishers[msg.topic_id] = pub self.callbacks[msg.topic_id] = pub.handlePacket self.setPublishSize(msg.buffer_size) - rospy.loginfo("Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type)) except Exception as e: rospy.logerr("Creation of publisher failed: %s", e) @@ -588,14 +616,15 @@ def setupSubscriber(self, data): sub = Subscriber(msg, self) self.subscribers[msg.topic_name] = sub self.setSubscribeSize(msg.buffer_size) - rospy.loginfo("Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type)) elif msg.message_type != self.subscribers[msg.topic_name].message._type: old_message_type = self.subscribers[msg.topic_name].message._type self.subscribers[msg.topic_name].unregister() sub = Subscriber(msg, self) self.subscribers[msg.topic_name] = sub self.setSubscribeSize(msg.buffer_size) - rospy.loginfo("Change the message type of subscriber on %s from [%s] to [%s]" % (msg.topic_name, old_message_type, msg.message_type) ) + rospy.loginfo("Change the message type of subscriber on %s from [%s] to [%s]" % ( + msg.topic_name, old_message_type, msg.message_type)) except Exception as e: rospy.logerr("Creation of subscriber failed: %s", e) @@ -609,7 +638,8 @@ def setupServiceServerPublisher(self, data): srv = self.services[msg.topic_name] except KeyError: srv = ServiceServer(msg, self) - rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup service server on %s [%s]" % + (msg.topic_name, msg.message_type)) self.services[msg.topic_name] = srv if srv.mres._md5sum == msg.md5sum: self.callbacks[msg.topic_id] = srv.handlePacket @@ -628,7 +658,8 @@ def setupServiceServerSubscriber(self, data): srv = self.services[msg.topic_name] except KeyError: srv = ServiceServer(msg, self) - rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup service server on %s [%s]" % + (msg.topic_name, msg.message_type)) self.services[msg.topic_name] = srv if srv.mreq._md5sum == msg.md5sum: srv.id = msg.topic_id @@ -647,7 +678,8 @@ def setupServiceClientPublisher(self, data): srv = self.services[msg.topic_name] except KeyError: srv = ServiceClient(msg, self) - rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup service client on %s [%s]" % + (msg.topic_name, msg.message_type)) self.services[msg.topic_name] = srv if srv.mreq._md5sum == msg.md5sum: self.callbacks[msg.topic_id] = srv.handlePacket @@ -666,7 +698,8 @@ def setupServiceClientSubscriber(self, data): srv = self.services[msg.topic_name] except KeyError: srv = ServiceClient(msg, self) - rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) ) + rospy.loginfo("Setup service client on %s [%s]" % + (msg.topic_name, msg.message_type)) self.services[msg.topic_name] = srv if srv.mres._md5sum == msg.md5sum: srv.id = msg.topic_id @@ -681,7 +714,7 @@ def handleTimeRequest(self, data): t.data = rospy.Time.now() data_buffer = StringIO.StringIO() t.serialize(data_buffer) - self.send( TopicInfo.ID_TIME, data_buffer.getvalue() ) + self.send(TopicInfo.ID_TIME, data_buffer.getvalue()) self.lastsync = rospy.Time.now() def handleParameterRequest(self, data): @@ -692,28 +725,28 @@ def handleParameterRequest(self, data): try: param = rospy.get_param(req.name) except KeyError: - rospy.logerr("Parameter %s does not exist"%req.name) + rospy.logerr("Parameter %s does not exist" % req.name) return if param is None: - rospy.logerr("Parameter %s does not exist"%req.name) + rospy.logerr("Parameter %s does not exist" % req.name) return if isinstance(param, dict): - rospy.logerr("Cannot send param %s because it is a dictionary"%req.name) + rospy.logerr("Cannot send param %s because it is a dictionary" % req.name) return if not isinstance(param, list): param = [param] - #check to make sure that all parameters in list are same type + # check to make sure that all parameters in list are same type t = type(param[0]) for p in param: - if t!= type(p): - rospy.logerr('All Paramers in the list %s must be of the same type'%req.name) + if t != type(p): + rospy.logerr('All Paramers in the list %s must be of the same type' % req.name) return if t == int or t == bool: resp.ints = param if t == float: - resp.floats =param + resp.floats = param if t == str: resp.strings = param data_buffer = StringIO.StringIO() @@ -758,11 +791,13 @@ def _send(self, topic, msg): rospy.logerr("Message from ROS network dropped: message larger than buffer.\n%s" % msg) return -1 else: - #modified frame : header(2 bytes) + msg_len(2 bytes) + msg_len_chk(1 byte) + topic_id(2 bytes) + msg(x bytes) + msg_topic_id_chk(1 byte) + # modified frame : header(2 bytes) + msg_len(2 bytes) + msg_len_chk(1 byte) + topic_id(2 bytes) + msg(x bytes) + msg_topic_id_chk(1 byte) # second byte of header is protocol version - msg_len_checksum = 255 - ( ((length&255) + (length>>8))%256 ) - msg_checksum = 255 - ( ((topic&255) + (topic>>8) + sum([ord(x) for x in msg]))%256 ) - data = "\xff" + self.protocol_ver + chr(length&255) + chr(length>>8) + chr(msg_len_checksum) + chr(topic&255) + chr(topic>>8) + msg_len_checksum = 255 - (((length & 255) + (length >> 8)) % 256) + msg_checksum = 255 - (((topic & 255) + (topic >> 8) + sum([ord(x) for x in msg])) % 256) + data = "\xff" + self.protocol_ver + \ + chr(length & 255) + chr(length >> 8) + \ + chr(msg_len_checksum) + chr(topic & 255) + chr(topic >> 8) data = data + msg + chr(msg_checksum) self._write(data) return length @@ -801,14 +836,14 @@ def sendDiagnostics(self, level, msg_text): status.level = level status.values.append(diagnostic_msgs.msg.KeyValue()) - status.values[0].key="last sync" - if self.lastsync.to_sec()>0: - status.values[0].value=time.ctime(self.lastsync.to_sec()) + status.values[0].key = "last sync" + if self.lastsync.to_sec() > 0: + status.values[0].value = time.ctime(self.lastsync.to_sec()) else: - status.values[0].value="never" + status.values[0].value = "never" status.values.append(diagnostic_msgs.msg.KeyValue()) - status.values[1].key="last sync lost" - status.values[1].value=time.ctime(self.lastsync_lost.to_sec()) + status.values[1].key = "last sync lost" + status.values[1].value = time.ctime(self.lastsync_lost.to_sec()) self.pub_diagnostics.publish(msg)