forked from caomw/icra2017-visual-navigation-1
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scene_loader.py
158 lines (127 loc) · 5.16 KB
/
scene_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# -*- coding: utf-8 -*-
import sys
import h5py
import json
import numpy as np
import random
import skimage.io
from skimage.transform import resize
from constants import ACTION_SIZE
from constants import SCREEN_WIDTH
from constants import SCREEN_HEIGHT
from constants import HISTORY_LENGTH
class THORDiscreteEnvironment(object):
def __init__(self, config=dict()):
# configurations
self.scene_name = config.get('scene_name', 'bedroom_04')
self.random_start = config.get('random_start', True)
self.n_feat_per_locaiton = config.get('n_feat_per_locaiton', 1) # 1 for no sampling
self.terminal_state_id = config.get('terminal_state_id', 0)
self.h5_file_path = config.get('h5_file_path', 'data/%s.h5'%self.scene_name)
self.h5_file = h5py.File(self.h5_file_path, 'r')
self.locations = self.h5_file['location'][()]
self.rotations = self.h5_file['rotation'][()]
self.n_locations = self.locations.shape[0]
self.terminals = np.zeros(self.n_locations)
self.terminals[self.terminal_state_id] = 1
self.terminal_states, = np.where(self.terminals)
self.transition_graph = self.h5_file['graph'][()]
self.shortest_path_distances = self.h5_file['shortest_path_distance'][()]
self.history_length = HISTORY_LENGTH
self.screen_height = SCREEN_HEIGHT
self.screen_width = SCREEN_WIDTH
# we use pre-computed fc7 features from ResNet-50
# self.s_t = np.zeros([self.screen_height, self.screen_width, self.history_length])
self.s_t = np.zeros([2048, self.history_length])
self.s_t1 = np.zeros_like(self.s_t)
self.s_target = self._tiled_state(self.terminal_state_id)
self.reset()
# public methods
def reset(self):
# randomize initial state
while True:
k = random.randrange(self.n_locations)
min_d = np.inf
# check if target is reachable
for t_state in self.terminal_states:
dist = self.shortest_path_distances[k][t_state]
min_d = min(min_d, dist)
# min_d = 0 if k is a terminal state
# min_d = -1 if no terminal state is reachable from k
if min_d > 0: break
# reset parameters
self.current_state_id = k
self.s_t = self._tiled_state(self.current_state_id)
self.reward = 0
self.collided = False
self.terminal = False
def step(self, action):
assert not self.terminal, 'step() called in terminal state'
k = self.current_state_id
# id are just the indexes through the states, that's all. each elem is a state
if self.transition_graph[k][action] != -1:
self.current_state_id = self.transition_graph[k][action]
if self.terminals[self.current_state_id]:
self.terminal = True
self.collided = False
else:
self.terminal = False
self.collided = False
else:
self.terminal = False
self.collided = True
self.reward = self._reward(self.terminal, self.collided)
# s_t here, the curr_state, is actually the state before the step. self.state returns the state after the step. some weird
# combo of these is used to make s_t1 (the next state). Then after, step() is completed, update() is called where
# s_t <-- s_t1
# note that self.state() returns the ResNet feature, which is what we will need to get from the observation
self.s_t1 = np.append(self.s_t[:,1:], self.state, axis=1)
def update(self):
self.s_t = self.s_t1
# private methods
def _tiled_state(self, state_id):
k = random.randrange(self.n_feat_per_locaiton)
f = self.h5_file['resnet_feature'][state_id][k][:,np.newaxis] # f is some portion of the RestNet features
return np.tile(f, (1, self.history_length)) # Repeats f (1, self.history_length) times
def _reward(self, terminal, collided):
# positive reward upon task completion
if terminal: return 10.0
# time penalty or collision penalty
return -0.1 if collided else -0.01
# properties
@property
def action_size(self):
# move forward/backward, turn left/right for navigation
return ACTION_SIZE
@property
def action_definitions(self):
action_vocab = ["MoveForward", "RotateRight", "RotateLeft", "MoveBackward"] # TODO: whoops I think this is the order we want?
return action_vocab[:ACTION_SIZE]
@property
def observation(self): # this is the RGB image of the agent's first person view
return self.h5_file['observation'][self.current_state_id]
@property
def state(self):
# read from hdf5 cache
# "we extracted 10 features from the proximity of each location coordinates -- we sampled a small Gaussian offset from the (x,y) coordinates"
k = random.randrange(self.n_feat_per_locaiton)
return self.h5_file['resnet_feature'][self.current_state_id][k][:,np.newaxis]
@property
def target(self):
return self.s_target
@property
def x(self):
return self.locations[self.current_state_id][0]
@property
def z(self):
return self.locations[self.current_state_id][1]
@property
def r(self):
return self.rotations[self.current_state_id]
if __name__ == "__main__":
scene_name = 'bedroom_04'
env = THORDiscreteEnvironment({
'random_start': True,
'scene_name': scene_name,
'h5_file_path': 'data/%s.h5'%scene_name
})