-
Notifications
You must be signed in to change notification settings - Fork 0
/
gesturereader.py
340 lines (269 loc) · 12.5 KB
/
gesturereader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import myopython.myo as libmyo
import os
from scipy.spatial import distance
from queue import Queue
from math import pi
from functools import reduce
#Defauts
USE_ORIENTATION = True
USE_POSE = False
USE_GYROSCOPE = False
USE_ACCELEROMETER = True
USE_EMG = False
USE_COUNT = False
BASE_DIR = os.getcwd()
DEFAULT_MYO_PATH = os.path.join(BASE_DIR, "sdk/myo.framework")
VERBOSE = False
NOWRITE = True
def V(text, override = False):
if VERBOSE or override:
print(text + '\n')
def flatten(list):
if list:
return list if type(list) is not list else [item for sublist in list for item in sublist]
return None
# Long term should probably make a lot of these private
class GestureListener(libmyo.DeviceListener):
# run
#TODO pick smart defaults
def __init__(self, end_time_cutoff=40, begin_time_cutoff=5, emg_cutoff=60, gyro_cutoff=30, accel_cutoff=10, orient_cutoff=.1, use_count=USE_COUNT, use_orientation=USE_ORIENTATION, use_gyroscope=USE_GYROSCOPE, use_accelerometer=USE_ACCELEROMETER, use_emg=USE_EMG, use_pose=USE_POSE):
super(GestureListener, self).__init__()
self.count = 0
self.use_count = use_count
self.use_emg = use_emg
self.use_gyroscope = use_gyroscope
self.use_accelerometer = use_accelerometer
self.use_orientation = use_orientation
self.use_pose = use_pose
self.gesturing = False
self.last_movements_buffer = []
self.at_rest_buffer = [True]
self.gesture_data_buffer = []
self.gesture_buffer = Queue()
self.orientation = None
self.pose = libmyo.Pose.rest
self.acceleration = None
self.gyroscope = None
self.emg = None
self.bad_accel_count = 0
self.end_time_cutoff = end_time_cutoff
self.begin_time_cutoff = begin_time_cutoff
self.emg_cutoff = emg_cutoff
self.gyro_cutoff = gyro_cutoff
self.accel_cutoff = accel_cutoff
self.orient_cutoff = orient_cutoff
def has_gesture(self):
return not self.gesture_buffer.empty()
def get_gesture(self):
return self.gesture_buffer.get()
def on_orientation_data(self, myo, timestamp, quat):
myo.set_stream_emg(libmyo.StreamEmg.enabled)
self.orientation = list(quat.rpy)
# normalize a la hello.cpp
self.orientation[0] = int((self.orientation[0] + pi) / (2.0*pi) * 100)
self.orientation[1] = int((self.orientation[1] + pi) / (2.0*pi) * 100)
self.orientation[2] = int((self.orientation[2] + pi) / (2.0*pi) * 100)
if self.use_orientation:
V("Orientation data changed.")
self.handle_state_change()
#self.__update_at_rest()
def on_pose(self, myo, timestamp, pose):
if self.use_pose:
V("Pose has changed from " + str(self.pose) + " to " + str(pose))
self.pose = pose
self.handle_state_change()
# ALWAYS USE FOR DISTANCE FUNCTION SO ALWAYS PERSIST
def on_accelerometor_data(self, myo, timestamp, acceleration):
V("Accleration has changed")
self.acceleration = [acceleration.x*100, acceleration.y*100, acceleration.z*100]
self.handle_state_change()
self.__update_at_rest()
def __update_at_rest(self):
self.at_rest_buffer.append(self.__get_at_rest())
def on_gyroscope_data(self, myo, timestamp, gyroscope):
if self.use_gyroscope:
V("Gyroscope has changed")
self.gyroscope = gyroscope
self.handle_state_change()
def on_emg_data(self, myo, timestamp, emg):
if self.use_emg:
V("Emg data has changed")
self.emg = emg
self.handle_state_change()
def handle_state_change(self):
state = self.__get_state()
self.last_movements_buffer.append(state)
if self.gesturing:
self.gesture_data_buffer.append(state)
self.__clear_big_data_buffers()
self.__check_thresholds_for_gesturing()
def __clear_big_data_buffers(self):
# clear last_movements_buffer if big periodically
if len(self.last_movements_buffer) > self.end_time_cutoff*10:
self.last_movements_buffer = self.last_movements_buffer[-self.end_time_cutoff:]
self.at_rest_buffer = self.at_rest_buffer[-self.end_time_cutoff:]
def __check_thresholds_for_gesturing(self):
# Get the data up to the time threshold
gesture_end_latest_rest = self.at_rest_buffer[-self.end_time_cutoff:]
gesture_begin_latest_rest = self.at_rest_buffer[-self.begin_time_cutoff:]
gesture_cut_off = gesture_end_latest_rest.count(gesture_end_latest_rest[0]) == len(gesture_end_latest_rest)
gesture_begin = gesture_begin_latest_rest[0] == False and gesture_begin_latest_rest.count(gesture_begin_latest_rest[0]) == len(gesture_begin_latest_rest)
# if gesture_begin_latest_rest[0] == False:
# print('latest not at rest')
# print(gesture_begin_latest_rest)
# if gesture_begin_latest_rest.count(gesture_begin_latest_rest[0]) == len(gesture_begin_latest_rest):
# print("YEAH")
# if we've been at rest for time threshold...
if gesture_cut_off:
# if we were gesturing before, store the gesture and reset
if self.gesturing:
self.gesture_buffer.put(self.gesture_data_buffer)
self.gesture_data_buffer = []
self.gesturing = False
self.count = 0
print("Gesture is over")
# ... otherwise we may be gesturing anew
elif gesture_begin:
if not self.gesturing:
self.gesturing = True
print("Beginning a gesture")
def __get_state(self):
if self.gesturing:
self.count+=1
return State(self.pose, self.emg, self.orientation, self.acceleration, self.gyroscope, self.count)
#TODO Make this smarter
def __get_at_rest(self):
if len(self.last_movements_buffer) < 2: #Very first data read
return True
if self.bad_accel_count < 200:
if self.bad_accel_count is 10:
print("Cleaning out bad acceleration data...")
elif self.bad_accel_count is 199:
print("Bad data cleaned! Waiting for a gesture.")
self.bad_accel_count += 1
return True
state_t = self.last_movements_buffer[-1:][0]
state_t_minus_1 = self.last_movements_buffer[-2:-1][0]
# New way, just accelerometer
if state_t.acceleration is None or state_t_minus_1.acceleration is None:
return True
d = distance.euclidean(state_t.acceleration, state_t_minus_1.acceleration) + distance.euclidean(state_t.orientation, state_t_minus_1.orientation)
return self.accel_cutoff > d
# NB MUST BE SAME ORDER AS GET_STATE
# pose is always first, so if that changes we've officially moved
# if self.use_pose and state_t.pose != state_t_minus_1.pose:
# return False
# return whatever is next in order
# cutoff = None
# if self.use_emg and state_t.emg is not None and state_t_minus_1.emg is not None:
# d = distance.euclidean(state_t.emg, state_t_minus_1.emg)
# #print(state_t.emg)
# #print(d)
# return self.emg_cutoff > d
# elif self.use_orientation and state_t.orientation is not None and state_t_minus_1.orientation is not None:
# return self.orient_cutoff > distance.euclidean(state_t.orientation, state_t_minus_1.orientation)
# elif self.use_accelerometer and state_t.acceleration is not None and state_t_minus_1.acceleration is not None:
# return self.accel_cutoff > distance.euclidean(state_t.acceleration, state_t_minus_1.acceleration)
# else:
# return self.gyro_cutoff > distance.euclidean(state_t.gyroscope, state_t_minus_1.gyroscope)
#region Not so important overrides
def on_arm_sync(self, myo, timestamp, arm, x_direction, rotation, warmup_state):
V("Myo has synced, dude " + str(timestamp))
myo.vibrate("short")
def on_arm_unsync(self, myo, timestamp):
V("Myo is lost now " + str(timestamp))
myo.vibrate("medium")
def on_pair(self, myo, timestamp, firmware_version):
V("Myo has paired, bitches. Time for some ASL at " + str(timestamp))
def on_unpair(self, myo, timestamp):
V("Myo says 'deuces' at " + str(timestamp))
#endregion
class State(object):
"""docstring for State"""
def __init__(self, pose, emg, orientation, acceleration, gyroscope, count):
super(State, self).__init__()
self.pose = pose
self.emg = emg
self.orientation = orientation
self.acceleration = acceleration
self.gyroscope = gyroscope
self.count = count
# TODO better spring handling here for items ala orientation
def __str__(self):
emg = str(self.emg)[1:-1].replace(" ", "") if self.emg is not None else None
orientation = str(self.orientation)[1:-1].replace(" ", "") if self.orientation is not None else None
acceleration = str(int(self.acceleration[0])) + "," + str(int(self.acceleration[1])) + "," + str(int(self.acceleration[2])) if self.acceleration is not None else None
gyroscope = str(self.gyroscope)[1:-1].replace(" ", "") if self.gyroscope is not None else None
count = str(count) if USE_COUNT else None
#str(self.pose.value)
items = [emg, orientation, acceleration, gyroscope, count]
return ",".join(filter(None,items))
def __repr__(self):
return self.__str__()
class GestureData(object):
"""docstring for GestureData"""
def __init__(self, data):
super(GestureData, self).__init__()
self.all_data = data
self.hand_data = GestureData.__create_hand_data(data)
self.arm_data = GestureData.__create_arm_data(data)
def as_classification_list(self):
string = ""
for state in self.all_data:
string += str(state) + "\n"
return string
def __create_hand_data(all_data):
return all_data #TODO
def __create_arm_data(all_data):
return all_data #TODO
class GestureReader(object):
"""
An object for chunking and reading gestures from the myo armband.
Ideally, this would run in a thread. Instead, to get useful results,
call the readGesture method repeatedly, and the class will chuck gestures
for you. If you call it slowly, it won't do very much for you...
Takes optional parameters
myoPath -> a string pointing to the myo SDK
geture_time_cutoff -> an integer representing the time required to divide
gestures by stillness in units of .05 seconds. Defaults to 40 (2 seconds)
geture_distance_cutoff -> an integer representing the maximum distance that
constitutes a new gesture in terms of squared unit myo quarternions
"""
def __init__(self, myoPath=None):
super(GestureReader, self).__init__()
libmyo.init(myoPath if myoPath is not None else DEFAULT_MYO_PATH)
self.listener = GestureListener()
def __enter__(self):
self.hub = libmyo.Hub()
self.hub.run(int(1000/20), self.listener) # 1000/20 = 50 i.e. 20 times per second (1000 ms)
try:
self.hub.set_locking_policy(libmyo.LockingPolicy.none)
except:
print("locking policy failed")
return self
def __exit__(self, type, value, traceback):
self.hub.shutdown()
def readGesture(self):
while (True):
if self.listener.has_gesture():
return GestureData(self.listener.get_gesture())
WORD = 'goodbye'
NAME = 'John'
if __name__ == '__main__':
counter = 0
filename = os.path.join(BASE_DIR, "training/" + NAME + "/" + WORD + str(counter))
with GestureReader() as gestureReader:
print("Word you are recording is " + WORD)
while(True):
print("Waiting for your next gesture... " + "\n")
gestureData = gestureReader.readGesture()
if (gestureData):
data = ""
for d in gestureData.all_data:
print(d)
data += str(d) + "\n"
counter+=1
if not NOWRITE:
with open(filename, '+w') as file:
file.write(data)
filename = os.path.join(BASE_DIR, "training/" + NAME + "/" + WORD + str(counter))