-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmulti_airsim_agent.py
130 lines (118 loc) · 5.38 KB
/
multi_airsim_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import time
import random
import math
import numpy as np
# from multiprocessing import Process
from collections import deque
import torch
import gym
import gym_airsim
from multi_agent import MultiAgent
import constants
class AirSimSingleAgent:
def __init__(self, placeRecognition, trail, navigation):
self.env = gym.make('AirSim-v1')
self.placeRecognition = placeRecognition
self.trail = trail
self.navigation = navigation
self.agent_state = constants.MULTI_AGENT_STATE_SEARCH
self.temporary_trail = []
self.goal = None
self.init = self.env.reset()
self.cycle = 0
self.current_state = None
self.previous_action = 0
self.placeRecognition.model.eval()
self.navigation.model.eval()
self.test_actions = [0, 0, 0, 0, 3, 0, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 4, 0, 5, 0, 0]
self.test_action_id = 0
def reset_episode(self):
self.test_action_id = 0
init_position, init_orientation = [10, 0, -6], [0, 0, 0]
self.env.set_initial_pose(init_position, init_orientation)
self.env.set_mode(constants.AIRSIM_MODE_REPEAT)
time.sleep(1)
state = self.env.reset()
self.init = state
self.current_state = state
self.temporary_trail = [state]
self.trail.clear_sequence()
return state
def goal_reached(self, pose):
# return (self.test_action_id >= len(self.test_actions)) # TODO: temporary
return (math.fabs(pose['x_pos'] - 12.6) < 0.5 and
math.fabs(pose['y_pos'] - 1.8) < 0.5 and
math.fabs(pose['yaw'] - 1.2) < 0.5)
def search(self):
print ("pose: ", self.env.get_position_orientation())
# index, score, velocity = self.trail.find_closest_waypoint(self.current_state)
index, score, velocity = self.trail.find_best_waypoint(self.current_state)
# index, score, velocity = self.trail.find_most_similar_waypoint(self.current_state)
if (index == -1): # or random.random() < constants.MULTI_AGENT_RANDOM_MOVEMENT_CHANCE): # TODO
# action = random.randint(0, constants.LOCO_NUM_CLASSES-1)
action = self.test_actions[self.test_action_id]
self.test_action_id += 1
next_state, _, done, info = self.env.step(action)
print (action)
else:
# if (index+1 >= self.trail.len()):
# future_state = self.trail.waypoints[index].state
# else:
# future_state = self.trail.waypoints[index+1].state
future_state = self.trail.waypoints[index].state
actions = self.navigation.forward(self.current_state, self.trail.waypoints[index].state, future_state)
actions = torch.squeeze(actions)
sorted_actions, indices = torch.sort(actions, descending=True)
action = indices[0]
if ((self.previous_action == 0 and action == 3) or
(self.previous_action == 3 and action == 0) or
(self.previous_action == 1 and action == 2) or
(self.previous_action == 2 and action == 1) or
(self.previous_action == 4 and action == 5) or
(self.previous_action == 5 and action == 4)):
action = indices[1]
print ("matched: ", index, score, actions)
next_state, _, done, info = self.env.step(action)
from PIL import Image
# current_image = Image.fromarray(self.current_state)
future_image = Image.fromarray(future_state)
# current_image.save("current.png", "PNG")
future_image.save("future.png", "PNG")
self.previous_action = action
self.current_state = next_state
self.temporary_trail.append(next_state)
return info
def update(self, cycle):
self.cycle = cycle
if (self.agent_state == constants.MULTI_AGENT_STATE_SEARCH):
if (len(self.temporary_trail) > 100): # TODO: temporary
print ("got too long, resetting")
self.reset_episode()
pose = self.search()
if (self.goal_reached(pose)):
# self.agent_state = constants.MULTI_AGENT_STATE_HOME
steps_to_goal = len(self.temporary_trail)
print ("goal reached, trail len: ", len(self.temporary_trail))
for state in self.temporary_trail:
self.trail.append_waypoint(input=state, created_at=self.cycle, steps_to_goal=steps_to_goal)
steps_to_goal -= 1
self.reset_episode()
class MultiAirSimAgent(MultiAgent):
def __init__(self, placeRecognition, navigation):
super(MultiAirSimAgent, self).__init__(placeRecognition, navigation)
self.placeRecognition = placeRecognition
self.navigation = navigation
self.cycle = 0
self.agents = []
self.placeRecognition.model.eval()
self.navigation.model.eval()
for i in range(constants.MULTI_NUM_AGENTS):
self.agents.append(AirSimSingleAgent(self.placeRecognition, self.trail, self.navigation))
def run(self):
for i in range(constants.MULTI_NUM_AGENTS):
self.agents[i].reset_episode()
while (True):
self.cycle += 1
for i in range(constants.MULTI_NUM_AGENTS):
self.agents[i].update(self.cycle)
self.trail.update_waypoints()