-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent_wrapper.py
104 lines (88 loc) · 4.31 KB
/
agent_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python
# Copyright (c) 2019 Intel Corporation
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
Wrapper for autonomous agents required for tracking and checking of used sensors
"""
from __future__ import print_function
import carla
from srunner.autoagents.sensor_interface import CallBack
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
class AgentWrapper(object):
"""
Wrapper for autonomous agents required for tracking and checking of used sensors
"""
_agent = None
_sensors_list = []
def __init__(self, agent):
"""
Set the autonomous agent
"""
self._agent = agent
def __call__(self):
"""
Pass the call directly to the agent
"""
return self._agent()
def setup_sensors(self, vehicle, debug_mode=False):
"""
Create the sensors defined by the user and attach them to the ego-vehicle
:param vehicle: ego vehicle
:return:
"""
bp_library = CarlaDataProvider.get_world().get_blueprint_library()
for sensor_spec in self._agent.sensors():
# These are the sensors spawned on the carla world
print("!!!!!! " + str(sensor_spec['type']))
if str(sensor_spec['type']) in ['sensor.can_bus', 'sensor.lidar.ray_cast', 'sensor.hd_map', 'sensor.speedometer']:
continue
bp = bp_library.find(str(sensor_spec['type']))
if sensor_spec['type'].startswith('sensor.camera'):
bp.set_attribute('image_size_x', str(sensor_spec['width']))
bp.set_attribute('image_size_y', str(sensor_spec['height']))
bp.set_attribute('fov', str(sensor_spec['fov']))
sensor_location = carla.Location(x=sensor_spec['x'], y=sensor_spec['y'],
z=sensor_spec['z'])
sensor_rotation = carla.Rotation(pitch=sensor_spec['pitch'],
roll=sensor_spec['roll'],
yaw=sensor_spec['yaw'])
elif sensor_spec['type'].startswith('sensor.lidar'):
bp.set_attribute('range', str(sensor_spec['range']))
bp.set_attribute('rotation_frequency', str(sensor_spec['rotation_frequency']))
bp.set_attribute('channels', str(sensor_spec['channels']))
bp.set_attribute('upper_fov', str(sensor_spec['upper_fov']))
bp.set_attribute('lower_fov', str(sensor_spec['lower_fov']))
bp.set_attribute('points_per_second', str(sensor_spec['points_per_second']))
sensor_location = carla.Location(x=sensor_spec['x'], y=sensor_spec['y'],
z=sensor_spec['z'])
sensor_rotation = carla.Rotation(pitch=sensor_spec['pitch'],
roll=sensor_spec['roll'],
yaw=sensor_spec['yaw'])
elif sensor_spec['type'].startswith('sensor.other.gnss'):
sensor_location = carla.Location(x=sensor_spec['x'], y=sensor_spec['y'],
z=sensor_spec['z'])
sensor_rotation = carla.Rotation()
# create sensor
sensor_transform = carla.Transform(sensor_location, sensor_rotation)
sensor = CarlaDataProvider.get_world().spawn_actor(bp, sensor_transform, vehicle)
# setup callback
print("????")
print(sensor.type_id)
sensor.listen(CallBack(sensor_spec['id'], sensor, self._agent.sensor_interface))
self._sensors_list.append(sensor)
# while not self._agent.all_sensors_ready():
# if debug_mode:
# print(" waiting for one data reading from sensors...")
# CarlaDataProvider.get_world().tick()
def cleanup(self):
"""
Remove and destroy all sensors
"""
for i, _ in enumerate(self._sensors_list):
if self._sensors_list[i] is not None:
self._sensors_list[i].stop()
self._sensors_list[i].destroy()
self._sensors_list[i] = None
self._sensors_list = []