-
Notifications
You must be signed in to change notification settings - Fork 0
/
robotraconteur_ati_driver.py
127 lines (97 loc) · 4.02 KB
/
robotraconteur_ati_driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import RobotRaconteur as RR
RRN = RR.RobotRaconteurNode.s
import RobotRaconteurCompanion as RRC
import argparse
import sys
import platform
import threading
import numpy as np
import time
import rpi_ati_net_ft
class ATIDriver(object):
def __init__(self, host):
self._lock = threading.RLock()
self._recv_lock=threading.RLock()
self._streaming = False
self.host = host
self.ati_obj = rpi_ati_net_ft.NET_FT(host)
self.ati_obj.set_tare_from_ft()
print(self.ati_obj.read_ft_http())
print(self.ati_obj.try_read_ft_http())
self.ati_obj.start_streaming()
# publish rate
self.rate = 1e-3
## for testing ##
self.last_ft = np.zeros(6)
def srv_start_streaming(self):
with self._lock:
if (self._streaming):
raise Exception("Already Streaming")
# start data streaming
self._streaming = True
self.t = threading.Thread(target=self.stream_loop)
self.t.start()
def srv_stop_streaming(self):
self._streaming = False
self.t.join()
def stream_loop(self):
while self._streaming:
st = time.time()
if(not self._streaming): return
res, ft, status = self.ati_obj.try_read_ft_streaming(.1)
if res:
self.send_sensor_val(ft)
else:
print("Error reading FT sensor, resetting")
self.ati_obj = rpi_ati_net_ft.NET_FT(self.host)
self.ati_obj.set_tare_from_ft()
print(self.ati_obj.read_ft_http())
print(self.ati_obj.try_read_ft_http())
self.ati_obj.start_streaming()
elapse = time.time()-st
if self.rate > elapse:
time.sleep(self.rate - elapse)
def send_sensor_val(self, ft):
# create the new namedarray
arraytype = RRN.GetNamedArrayDType('com.robotraconteur.geometry.Wrench')
# print(arraytype)
msg_arr = np.zeros((1,),dtype=arraytype)
# print(msg_arr)
# pack the data to the structure to send to the client
msg_arr[0]['torque']['x'] = ft[0]
msg_arr[0]['torque']['y'] = ft[1]
msg_arr[0]['torque']['z'] = ft[2]
msg_arr[0]['force']['x'] = ft[3]
msg_arr[0]['force']['y'] = ft[4]
msg_arr[0]['force']['z'] = ft[5]
# print(msg_arr)
self.wrench_sensor_value.OutValue=msg_arr
def setf_param(self, param_name, value):
if param_name == "set_tare":
self.ati_obj.set_tare_from_ft()
def main():
parser = argparse.ArgumentParser(description="ATI force torque sensor driver service for Robot Raconteur")
parser.add_argument("--sensor-ip", type=str, default="192.168.50.65", help="the ip address of the ati sensor")
parser.add_argument("--wait-signal",action='store_const',const=True,default=False, help="wait for SIGTERM orSIGINT (Linux only)")
args,_ = parser.parse_known_args()
# not yet know what this do
rr_args = ["--robotraconteur-jumbo-message=true"] + sys.argv
RRC.RegisterStdRobDefServiceTypes(RRN)
ati_obj = ATIDriver(args.sensor_ip)
with RR.ServerNodeSetup("com.robotraconteur.sensor.wrenchsensor",59823,argv=rr_args):
service_ctx = RRN.RegisterService("ati_sensor","com.robotraconteur.sensor.WrenchSensor",ati_obj)
ati_obj.srv_start_streaming()
if args.wait_signal:
#Wait for shutdown signal if running in service mode
print("Press Ctrl-C to quit...")
import signal
signal.sigwait([signal.SIGTERM,signal.SIGINT])
else:
#Wait for the user to shutdown the service
if (sys.version_info > (3, 0)):
input("Server started, press enter to quit...")
else:
raw_input("Server started, press enter to quit...")
ati_obj.srv_stop_streaming()
if __name__ == "__main__":
main()