-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnaoobject.py
198 lines (166 loc) · 6.9 KB
/
naoobject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# encoding: utf-8
from __future__ import print_function
import cv2
import vision_definitions
import numpy as np
class NAOCamera(object):
# Header for BMP picture
BMPHeader = '\x42\x4d' \
'\x36\x84\x03\x00' \
'\x00\x00\x00\x00' \
'\x36\x00\x00\x00' \
'\x28\x00\x00\x00' \
'\x40\x01\x00\x00' \
'\xf0\x00\x00\x00' \
'\x01\x00\x18\x00' \
'\x00\x00\x00\x00\x00\x00\x00\x00' \
'\x00\x00\x00\x00\x00\x00\x00\x00' \
'\x00\x00\x00\x00\x00\x00\x00\x00'
# parameter for camera
resolution = vision_definitions.kQVGA
colorSpace = vision_definitions.kBGRColorSpace
fps = 40
# number of saved pictures
ImgNo = 0
def __init__(self, proxy):
self.__camProxy = proxy
# name for two cameras
self.__nameId = [None, None]
self.subscribe()
def __del__(self):
self.un_subscribe()
def subscribe(self):
"""
subscribe to the two cameras of NAO
:return: True if success
"""
self.__nameId[0] = self.__camProxy.subscribeCamera('Camera_0', 0,
NAOCamera.resolution,
NAOCamera.colorSpace,
NAOCamera.fps)
self.__nameId[1] = self.__camProxy.subscribeCamera('Camera_1', 1,
NAOCamera.resolution,
NAOCamera.colorSpace,
NAOCamera.fps)
if self.__nameId[0] is not None and self.__nameId[1] is not None:
return True
else:
return False
def un_subscribe(self):
"""
unsubscribe to the twos cameras
:return: True if success
"""
if self.__nameId[0] is None and self.__nameId[1] is None:
pass
else:
self.__camProxy.unsubscribe(self.__nameId[0])
self.__camProxy.unsubscribe(self.__nameId[1])
self.__nameId = [None, None]
return True
def get_frame(self):
"""
receive data packages from NAO and transform them into frames
:return: {frame of camera 0, frame of camera 1}
"""
img = [None, None]
# get data packages of two cameras from NAO
img[0] = self.__camProxy.getImageRemote(self.__nameId[0])
self.__camProxy.releaseImage(self.__nameId[0])
img[1] = self.__camProxy.getImageRemote(self.__nameId[1])
self.__camProxy.releaseImage(self.__nameId[1])
# process the data packages and get frames
if img[0] is not None:
# decode
img[0] = np.fromstring(NAOCamera.BMPHeader + img[0][6], dtype=np.uint8)
img[0] = cv2.imdecode(img[0], 1)
img[0] = cv2.flip(img[0], 0)
if img[1] is not None:
# decode
img[1] = np.fromstring(NAOCamera.BMPHeader + img[1][6], dtype=np.uint8)
img[1] = cv2.imdecode(img[1], 1)
img[1] = cv2.flip(img[1], 0)
return img
def run(self):
"""
run the client and display the video
:return: None
"""
cv2.namedWindow('Camera_0', 1)
cv2.namedWindow('Camera_1', 1)
try:
while True:
try:
frame = self.get_frame()
# display
cv2.imshow('Camera_0', frame[0])
cv2.imshow('Camera_1', frame[1])
except KeyboardInterrupt:
break
key = cv2.waitKey(10)
if key == 27:
break
elif key == ord('s'):
cv2.imwrite('No%d_Camera0.bmp' % NAOCamera.ImgNo, frame[0])
cv2.imwrite('No%d_Camera1.bmp' % NAOCamera.ImgNo, frame[1])
NAOCamera.ImgNo += 1
finally:
cv2.destroyAllWindows()
class NAOJoint(object):
LINK_LIST = {"LArm": ['LShoulderPitch', 'LShoulderRoll', 'LElbowYaw', 'LElbowRoll', 'LWristYaw'],
"RArm": ['RShoulderPitch', 'RShoulderRoll', 'RElbowYaw', 'RElbowRoll', 'RWristYaw'],
"LLeg": ['LHipYawPitch', 'LHipRoll', 'LHipPitch', 'LKneePitch', 'LAnklePitch', 'LAnkleRoll'],
"RLeg": ['RHipYawPitch', 'RHipRoll', 'RHipPitch', 'RKneePitch', 'RAnklePitch', 'RAnkleRoll'],
"Head": ['HeadYaw', 'HeadPitch'],
"Body": []}
def __init__(self, proxy, link_list):
self.NameList = []
self.proxy = proxy
self.Link = []
# 根据需要控制的连杆,存储需要控制的关节
for link in link_list:
if link in NAOJoint.LINK_LIST.keys():
self.Link = self.Link + [link]
self.NameList = self.NameList + NAOJoint.LINK_LIST[link]
else:
raise RuntimeError('Invalid link.')
def get_stiffness(self):
return self.proxy.getStiffnesses(self.Link)
def set_stiffness(self, value):
self.proxy.setStiffnesses(self.Link, value)
def rest(self):
self.proxy.rest()
def __del__(self):
self.set_stiffness(0.0)
def set_joint(self, angle_list, time_list):
"""
Set angles of motors on a link.
This method is realized with a 'post' method, which is not blocking.
:param angle_list: [[angles at t0], [angles at t1], ..., [angles at tn]]
:param time_list: [t0, t1, ..., tn]
:return: id: thread id for the 'post' method
"""
if len(angle_list[0]) == len(self.NameList) and len(angle_list) == len(time_list):
angle_list = [[row[col] for row in angle_list] for col in range(len(angle_list[0]))]
time_list = [time_list] * len(angle_list)
id = self.proxy.post.angleInterpolation(self.NameList, angle_list, time_list, True)
else:
raise RuntimeError('Length of parameters does not correspond to that of links.')
return id
def get_joint(self):
return self.proxy.getAngles(self.NameList, True)
class NAOPosture(object):
POSE_LIST = ["Stand", "StandInit", "StandZero", "Crouch", "Sit", "SitRelax", "LyingBelly", "LyingBack"]
def __init__(self, proxy):
self.postureProxy = proxy
def pose(self, name):
if name in NAOPosture.POSE_LIST:
if self.postureProxy is not None:
result = self.postureProxy.goToPosture(name, 0.8)
if result:
print(name, ' Success.')
else:
raise RuntimeError(
"%s is not a part of the standard posture library or robot cannot reach the posture" % name)
else:
raise RuntimeError("Invalid posture.")