-
Notifications
You must be signed in to change notification settings - Fork 0
/
wrappers.py
346 lines (283 loc) · 12.7 KB
/
wrappers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import carla
import random
import time
import collections
import math
import numpy as np
import weakref
import pygame
import psutil
from skimage.transform import resize
def print_transform(transform):
print("Location(x={:.2f}, y={:.2f}, z={:.2f}) Rotation(pitch={:.2f}, yaw={:.2f}, roll={:.2f})".format(
transform.location.x,
transform.location.y,
transform.location.z,
transform.rotation.pitch,
transform.rotation.yaw,
transform.rotation.roll
)
)
def get_actor_display_name(actor, truncate=250):
name = " ".join(actor.type_id.replace("_", ".").title().split(".")[1:])
return (name[:truncate-1] + u"\u2026") if len(name) > truncate else name
def angle_diff(v0, v1):
""" Calculates the signed angle difference (-pi, pi] between 2D vector v0 and v1 """
angle = np.arctan2(v1[1], v1[0]) - np.arctan2(v0[1], v0[0])
if angle > np.pi: angle -= 2 * np.pi
elif angle <= -np.pi: angle += 2 * np.pi
return angle
def distance_to_line(A, B, p):
num = np.linalg.norm(np.cross(B - A, A - p))
denom = np.linalg.norm(B - A)
if np.isclose(denom, 0):
return np.linalg.norm(p - A)
return num / denom
def vector(v):
""" Turn carla Location/Vector3D/Rotation to np.array """
if isinstance(v, carla.Location) or isinstance(v, carla.Vector3D):
return np.array([v.x, v.y, v.z])
elif isinstance(v, carla.Rotation):
return np.array([v.pitch, v.yaw, v.roll])
def kill_process():
for process in psutil.process_iter(): # kill the existing Carla process
if process.name().lower().startswith('CarlaUE4'.lower()):
try:
process.terminate()
except:
pass
still_alive = []
for process in psutil.process_iter():
if process.name().lower().startswith('CarlaUE4'.lower()):
still_alive.append(process)
if len(still_alive):
for process in still_alive:
try:
process.kill()
except:
pass
psutil.wait_procs(still_alive)
CAMERA_TRANSFORMS = {
"spectator": carla.Transform(carla.Location(x=-5.5, z=2.8), carla.Rotation(pitch=-15)),
"dashboard": carla.Transform(carla.Location(x=1.6, z=1.7)),
"lidar_pos": carla.Transform(carla.Location(x=0.0, z=2.1))
}
#===============================================================================
# CarlaActorBase
#===============================================================================
class CarlaActorBase(object):
def __init__(self, world, actor):
self.world = world
self.actor = actor
self.world.actor_list.append(self)
self.destroyed = False
def destroy(self):
if self.destroyed:
raise Exception("Actor already destroyed.")
else:
print("Destroying ", self, "...")
self.actor.destroy()
self.world.actor_list.remove(self)
self.destroyed = True
def get_carla_actor(self):
return self.actor
def tick(self):
pass
def __getattr__(self, name):
"""Relay missing methods to underlying carla actor"""
return getattr(self.actor, name)
#===============================================================================
# CollisionSensor
#===============================================================================
class CollisionSensor(CarlaActorBase):
def __init__(self, world, vehicle, on_collision_fn):
self.on_collision_fn = on_collision_fn
# Collision history
self.history = []
# Setup sensor blueprint
bp = world.get_blueprint_library().find("sensor.other.collision")
# Create and setup sensor
weak_self = weakref.ref(self)
actor = world.spawn_actor(bp, carla.Transform(), attach_to=vehicle.get_carla_actor())
actor.listen(lambda event: CollisionSensor.on_collision(weak_self, event))
super().__init__(world, actor)
@staticmethod
def on_collision(weak_self, event):
self = weak_self()
if not self:
return
# Call on_collision_fn
if callable(self.on_collision_fn):
self.on_collision_fn(event)
#===============================================================================
# LaneInvasionSensor
#===============================================================================
class LaneInvasionSensor(CarlaActorBase):
def __init__(self, world, vehicle, on_invasion_fn):
self.on_invasion_fn = on_invasion_fn
# Setup sensor blueprint
bp = world.get_blueprint_library().find("sensor.other.lane_invasion")
# Create sensor
weak_self = weakref.ref(self)
actor = world.spawn_actor(bp, carla.Transform(), attach_to=vehicle.get_carla_actor())
actor.listen(lambda event: LaneInvasionSensor.on_invasion(weak_self, event))
super().__init__(world, actor)
@staticmethod
def on_invasion(weak_self, event):
self = weak_self()
if not self:
return
# Call on_invasion_fn
if callable(self.on_invasion_fn):
self.on_invasion_fn(event)
#===============================================================================
# Lidar
#===============================================================================
class Lidar(CarlaActorBase):
def __init__(self, world, obs_res, transform=carla.Transform(),
lidar_params = None,
attach_to=None, on_recv_info=None,
sensor_type="sensor.lidar.ray_cast"):
self.on_recv_info = on_recv_info
self.lidar_params = lidar_params
self.obs_res = obs_res
# Setup camera blueprint
lidar_bp = world.get_blueprint_library().find(sensor_type)
lidar_bp.set_attribute('channels', '32')
lidar_bp.set_attribute('range', '500')
# Create and setup camera actor
weak_self = weakref.ref(self)
actor = world.spawn_actor(lidar_bp, transform, attach_to=attach_to.get_carla_actor())
actor.listen(lambda data: Lidar.process_lidar_input(weak_self, data))
print("Spawned actor \"{}\"".format(actor.type_id))
super().__init__(world, actor)
@staticmethod
def process_lidar_input(weak_self, data):
self = weak_self()
if not self:
return
if callable(self.on_recv_info):
# Lidar image generation
point_cloud = []
# Get point cloud data
for location in data:
point_cloud.append([location.point.x, location.point.y, -location.point.z])
point_cloud = np.array(point_cloud)
# Separate the 3D space to bins for point cloud, x and y is set according to CARLA_PARAMS.lidar_bin,
# and z is set to be two bins.
y_bins = np.arange(-(self.lidar_params['lidar_obs_range'] - self.lidar_params['d_behind']),
self.lidar_params['d_behind']+self.lidar_params['lidar_bin'],
self.lidar_params['lidar_bin'])
x_bins = np.arange( -self.lidar_params['lidar_obs_range']/2,
self.lidar_params['lidar_obs_range']/2+self.lidar_params['lidar_bin'],
self.lidar_params['lidar_bin'])
z_bins = [-self.lidar_params['lidar_height']-1, -self.lidar_params['lidar_height']+0.25, 1]
# Get lidar image according to the bins
lidar, _ = np.histogramdd(point_cloud, bins=(x_bins, y_bins, z_bins))
lidar[:, :, 0] = np.array(lidar[:, :, 0] > 0, dtype=np.uint8)
lidar[:, :, 1] = np.array(lidar[:, :, 1] > 0, dtype=np.uint8)
lidar = np.flipud(lidar)
# # Add the waypoints to lidar image
# if CARLA_PARAMS.display_route_in_lidar:
# wayptimg = (birdeye[:, :, 0] <= 10) * \
# (birdeye[:, :, 1] <= 10) * (birdeye[:, :, 2] >= 240)
# else:
# # wayptimg = birdeye[:, :, 0] < 0 # Equal to a zero matrix
wayptimg = np.zeros((self.obs_res, self.obs_res))
wayptimg = np.roll(wayptimg, int( 3.5*self.obs_res/self.lidar_params['lidar_obs_range']), axis=1)
wayptimg = np.roll(wayptimg, int( -4*self.obs_res/self.lidar_params['lidar_obs_range']), axis=0)
wayptimg[:-30,:] = 0
wayptimg = np.expand_dims(wayptimg, axis=2)
# Get the final lidar image
lidar = np.concatenate((lidar, wayptimg), axis=2)
lidar = lidar * 255
self.on_recv_info(lidar)
def destroy(self):
super().destroy()
#===============================================================================
# Camera
#===============================================================================
class Camera(CarlaActorBase):
def __init__(self, world, obs_res, transform=carla.Transform(),
sensor_tick=0.0, attach_to=None, on_recv_image=None,
camera_type="sensor.camera.rgb", color_converter=carla.ColorConverter.Raw):
self.on_recv_image = on_recv_image
self.color_converter = color_converter
self.obs_res = obs_res
# Setup camera blueprint
camera_bp = world.get_blueprint_library().find(camera_type)
camera_bp.set_attribute("image_size_x", str(obs_res))
camera_bp.set_attribute("image_size_y", str(obs_res))
camera_bp.set_attribute("sensor_tick", str(sensor_tick))
camera_bp.set_attribute('fov', '110')
# Create and setup camera actor
weak_self = weakref.ref(self)
actor = world.spawn_actor(camera_bp, transform, attach_to=attach_to.get_carla_actor())
actor.listen(lambda image: Camera.process_camera_input(weak_self, image))
print("Spawned actor \"{}\"".format(actor.type_id))
super().__init__(world, actor)
@staticmethod
def process_camera_input(weak_self, image):
self = weak_self()
if not self:
return
if callable(self.on_recv_image):
image.convert(self.color_converter)
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (image.height, image.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
array = resize(array, (self.obs_res, self.obs_res)) * 255
self.on_recv_image(array)
def destroy(self):
super().destroy()
#===============================================================================
# Vehicle
#===============================================================================
class Vehicle(CarlaActorBase):
""" spawn vehicle equipped with the collision sensor and lane invansion sensor. """
def __init__(self, world, transform=carla.Transform(),
on_collision_fn=None, on_invasion_fn=None,
vehicle_type="vehicle.lincoln.mkz2017"):
# Setup vehicle blueprint
self.vehicle_bp = world.get_blueprint_library().find(vehicle_type)
color = self.vehicle_bp.get_attribute("color").recommended_values[0]
self.vehicle_bp.set_attribute("color", color)
# Create vehicle actor
actor = world.spawn_actor(self.vehicle_bp, transform)
print("Spawned actor \"{}\"".format(actor.type_id))
super().__init__(world, actor)
# Maintain vehicle control
self.control = carla.VehicleControl()
if callable(on_collision_fn):
self.collision_sensor = CollisionSensor(world, self, on_collision_fn=on_collision_fn)
if callable(on_invasion_fn):
self.lane_sensor = LaneInvasionSensor(world, self, on_invasion_fn=on_invasion_fn)
def tick(self):
self.actor.apply_control(self.control)
def get_speed(self):
velocity = self.get_velocity()
return np.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)
def get_closest_waypoint(self):
return self.world.map.get_waypoint(self.get_transform().location, project_to_road=True)
#===============================================================================
# World
#===============================================================================
class World():
def __init__(self, client):
self.world = client.get_world()
self.map = self.get_map()
self.actor_list = []
def tick(self):
for actor in list(self.actor_list):
actor.tick()
self.world.tick()
def destroy(self):
print("Destroying all spawned actors")
for actor in list(self.actor_list):
actor.destroy()
def get_carla_world(self):
return self.world
def __getattr__(self, name):
"""Relay missing methods to underlying carla object"""
return getattr(self.world, name)