-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmy_pg.py
110 lines (80 loc) · 2.77 KB
/
my_pg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
learn from movan
refrence:https://morvanzhou.github.io/tutorials/machine-learning/reinforcement-learning/5-1-policy-gradient-softmax1/
"""
import numpy as np
import tensorflow as tf
np.random.seed(1)
tf.set_random_seed(1)
class PolicyGradient:
def __init__(
self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.95,
output_graph=False,
):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.ep_obs,self.ep_as,self.ep_rs = [],[],[]
self._build_net()
self.sess = tf.Session()
if output_graph:
tf.summary.FileWriter("logs/",self.sess.graph)
self.sess.run(tf.global_variables_initializer())
def _build_net(self):
with tf.name_scope("inputs"):
self.tf_obs = tf.placeholder(tf.float32,[None,self.n_features],name="observations")
self.tf_acts = tf.placeholder(tf.float32,[None,],name="actions_num")
self.tf_vt = tf.placeholder(tf.float32,[None,],name="actions_value")
layer = tf.layers.dense(
inputs=self.tf_obs,
units=10,
activation=tf.nn.tanh,
kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc1'
)
all_act=tf.layers.dense(
inputs=layer,
units=self.n_actions,
activation=None,
kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc2'
)
self.all_act_prob = tf.nn.softmax(all_act,name='act_prob')
with tf.name_scope('loss'):
neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_act,labels=self.tf_acts)
loss=tf.reduce_mean(neg_log_prob*self.tf_vt)
with tf.name_scope('train'):
self.train_op= tf.train.AdamOptimizer(self.lr).minimize(loss)
def choose_action(self,observation):
prob_weights = self.sess.run(self.all_act_prob,feed_dict={self.tf_obs:observation[np.newaxis,:]})
action = np.random.choice(range(prob_weights.shape[1],p=prob_weights.ravel))
return action
def store_trasition(self,s,a,r):
self.ep_obs.append(s)
self.ep_as.append(a)
self.ep_rs.append(r)
def learn(self):
discounted_ep_rs_norm = self._discount_and_norm_rewards()
self.sess.run(self.train_op,feed_dict={
self.tf_obs: np.vstack(self.ep_obs),
self.tf_acts: np.array(self.ep_as),
self.tf_vt: discounted_ep_rs_norm,
})
self.ep_obs,self.ep_as,self.ep_rs = [],[],[]
return discounted_ep_rs_norm
def _discounted_and_norm_rewards(self):
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
for t in reversed(range(0,len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t]=running_add
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs