-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrevised_DPPO.py
188 lines (161 loc) · 7.82 KB
/
revised_DPPO.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
A simple version of OpenAI's Proximal Policy Optimization (PPO). [https://arxiv.org/abs/1707.06347]
Distributing workers in parallel to collect data, then stop worker's roll-out and train PPO on collected data.
Restart workers once PPO is updated.
The global PPO updating rule is adopted from DeepMind's paper (DPPO):
Emergence of Locomotion Behaviours in Rich Environments (Google Deepmind): [https://arxiv.org/abs/1707.02286]
View more on my tutorial website: https://morvanzhou.github.io/tutorials
Dependencies:
tensorflow r1.3
gym 0.9.2
"""
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import gym, threading, queue
EP_MAX = 1000
EP_LEN = 200
N_WORKER = 4 # parallel workers
GAMMA = 0.9 # reward discount factor
A_LR = 0.0001 # learning rate for actor
C_LR = 0.0002 # learning rate for critic
MIN_BATCH_SIZE = 64 # minimum batch size for updating PPO
UPDATE_STEP = 10 # loop update operation n-steps
EPSILON = 0.2 # for clipping surrogate objective
GAME = 'sumo-ants-v0'
# S_DIM, A_DIM = 3, 1 # state and action dimension
S_DIM, A_DIM = 137, 8
class PPO(object):
def __init__(self):
self.sess = tf.Session()
self.tfs = tf.placeholder(tf.float32, [None, S_DIM], 'state')
# critic
l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu)
self.v = tf.layers.dense(l1, 1)
self.tfdc_r = tf.placeholder(tf.float32, [None, 1], 'discounted_r')
self.advantage = self.tfdc_r - self.v
self.closs = tf.reduce_mean(tf.square(self.advantage))
self.ctrain_op = tf.train.AdamOptimizer(C_LR).minimize(self.closs)
# actor
pi, pi_params = self._build_anet('pi', trainable=True)
oldpi, oldpi_params = self._build_anet('oldpi', trainable=False)
self.sample_op = tf.squeeze(pi.sample(1), axis=0) # operation of choosing action
self.update_oldpi_op = [oldp.assign(p) for p, oldp in zip(pi_params, oldpi_params)]
self.tfa = tf.placeholder(tf.float32, [None, A_DIM], 'action')
self.tfadv = tf.placeholder(tf.float32, [None, 1], 'advantage')
# ratio = tf.exp(pi.log_prob(self.tfa) - oldpi.log_prob(self.tfa))
ratio = pi.prob(self.tfa) / (oldpi.prob(self.tfa) + 1e-5)
surr = ratio * self.tfadv # surrogate loss
self.aloss = -tf.reduce_mean(tf.minimum( # clipped surrogate objective
surr,
tf.clip_by_value(ratio, 1. - EPSILON, 1. + EPSILON) * self.tfadv))
self.atrain_op = tf.train.AdamOptimizer(A_LR).minimize(self.aloss)
self.sess.run(tf.global_variables_initializer())
def update(self):
global GLOBAL_UPDATE_COUNTER
while not COORD.should_stop():
if GLOBAL_EP < EP_MAX:
UPDATE_EVENT.wait() # wait until get batch of data
self.sess.run(self.update_oldpi_op) # copy pi to old pi
data = [QUEUE.get() for _ in range(QUEUE.qsize())] # collect data from all workers
data = np.vstack(data)
s, a, r = data[:, :S_DIM], data[:, S_DIM: S_DIM + A_DIM], data[:, -1:]
adv = self.sess.run(self.advantage, {self.tfs: s, self.tfdc_r: r})
# update actor and critic in a update loop
[self.sess.run(self.atrain_op, {self.tfs: s, self.tfa: a, self.tfadv: adv}) for _ in range(UPDATE_STEP)]
[self.sess.run(self.ctrain_op, {self.tfs: s, self.tfdc_r: r}) for _ in range(UPDATE_STEP)]
UPDATE_EVENT.clear() # updating finished
GLOBAL_UPDATE_COUNTER = 0 # reset counter
ROLLING_EVENT.set() # set roll-out available
def _build_anet(self, name, trainable):
with tf.variable_scope(name):
l1 = tf.layers.dense(self.tfs, 200, tf.nn.relu, trainable=trainable)
mu = 2 * tf.layers.dense(l1, A_DIM, tf.nn.tanh, trainable=trainable)
sigma = tf.layers.dense(l1, A_DIM, tf.nn.softplus, trainable=trainable)
norm_dist = tf.distributions.Normal(loc=mu, scale=sigma)
params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=name)
return norm_dist, params
def choose_action(self, s):
s = s[np.newaxis, :]
a = self.sess.run(self.sample_op, {self.tfs: s})[0]
return np.clip(a, -2, 2)
def get_v(self, s):
if s.ndim < 2: s = s[np.newaxis, :]
return self.sess.run(self.v, {self.tfs: s})[0, 0]
class Worker(object):
def __init__(self, wid):
self.wid = wid
self.env = gym.make(GAME).unwrapped
self.ppo = GLOBAL_PPO
def work(self):
global GLOBAL_EP, GLOBAL_RUNNING_R, GLOBAL_UPDATE_COUNTER
while not COORD.should_stop():
s = self.env.reset()
ep_r = 0
buffer_s, buffer_a, buffer_r = [], [], []
for t in range(EP_LEN):
if not ROLLING_EVENT.is_set(): # while global PPO is updating
ROLLING_EVENT.wait() # wait until PPO is updated
buffer_s, buffer_a, buffer_r = [], [], [] # clear history buffer, use new policy to collect data
a = self.ppo.choose_action(s)
s_, r, done, _ = self.env.step(a)
buffer_s.append(s)
buffer_a.append(a)
buffer_r.append((r + 8) / 8) # normalize reward, find to be useful
s = s_
ep_r += r
GLOBAL_UPDATE_COUNTER += 1 # count to minimum batch size, no need to wait other workers
if t == EP_LEN - 1 or GLOBAL_UPDATE_COUNTER >= MIN_BATCH_SIZE:
v_s_ = self.ppo.get_v(s_)
discounted_r = [] # compute discounted reward
for r in buffer_r[::-1]:
v_s_ = r + GAMMA * v_s_
discounted_r.append(v_s_)
discounted_r.reverse()
bs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.array(discounted_r)[:, np.newaxis]
buffer_s, buffer_a, buffer_r = [], [], []
QUEUE.put(np.hstack((bs, ba, br))) # put data in the queue
if GLOBAL_UPDATE_COUNTER >= MIN_BATCH_SIZE:
ROLLING_EVENT.clear() # stop collecting data
UPDATE_EVENT.set() # globalPPO update
if GLOBAL_EP >= EP_MAX: # stop training
COORD.request_stop()
break
# record reward changes, plot later
if len(GLOBAL_RUNNING_R) == 0:
GLOBAL_RUNNING_R.append(ep_r)
else:
GLOBAL_RUNNING_R.append(GLOBAL_RUNNING_R[-1] * 0.9 + ep_r * 0.1)
GLOBAL_EP += 1
print('{0:.1f}%'.format(GLOBAL_EP / EP_MAX * 100), '|W%i' % self.wid, '|Ep_r: %.2f' % ep_r, )
if __name__ == '__main__':
GLOBAL_PPO = PPO()
UPDATE_EVENT, ROLLING_EVENT = threading.Event(), threading.Event()
UPDATE_EVENT.clear() # not update now
ROLLING_EVENT.set() # start to roll out
workers = [Worker(wid=i) for i in range(N_WORKER)]
GLOBAL_UPDATE_COUNTER, GLOBAL_EP = 0, 0
GLOBAL_RUNNING_R = []
COORD = tf.train.Coordinator()
QUEUE = queue.Queue() # workers putting data in this queue
threads = []
for worker in workers: # worker threads
t = threading.Thread(target=worker.work, args=())
t.start() # training
threads.append(t)
# add a PPO updating thread
threads.append(threading.Thread(target=GLOBAL_PPO.update, ))
threads[-1].start()
COORD.join(threads)
# plot reward change and test
plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
plt.xlabel('Episode');
plt.ylabel('Moving reward');
# plt.ion();
plt.show()
env = gym.make('sumo-ants-v0')
while True:
s = env.reset()
for t in range(300):
env.render()
s = env.step(GLOBAL_PPO.choose_action(s))[0]