-
Notifications
You must be signed in to change notification settings - Fork 301
/
Copy pathmy_learning_agent.py
75 lines (57 loc) · 2.6 KB
/
my_learning_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import print_function
import argparse
import gymnasium as gym
import numpy as np
# from gymnasium import logger
from _policies import BinaryActionLinearPolicy # Different file so it can be unpickled
def cem(f, th_mean, batch_size, n_iter, elite_frac, initial_std=1.0):
"""
Generic implementation of the cross-entropy method for maximizing a black-box function
f: a function mapping from vector -> scalar
th_mean: initial mean over input distribution
batch_size: number of samples of theta to evaluate per batch
n_iter: number of batches
elite_frac: each batch, select this fraction of the top-performing samples
initial_std: initial standard deviation over parameter vectors
"""
n_elite = int(np.round(batch_size * elite_frac))
th_std = np.ones_like(th_mean) * initial_std
for _ in range(n_iter):
ths = np.array([th_mean + dth for dth in th_std[None, :] * np.random.randn(batch_size, th_mean.size)])
ys = np.array([f(th) for th in ths])
elite_inds = ys.argsort()[::-1][:n_elite]
elite_ths = ths[elite_inds]
th_mean = elite_ths.mean(axis=0)
th_std = elite_ths.std(axis=0)
yield {'ys': ys, 'theta_mean': th_mean, 'y_mean': ys.mean()}
def do_rollout(agent, env, num_steps, render=False):
total_rew = 0
ob, _ = env.reset()
for t in range(num_steps):
a = agent.act(ob)
(ob, reward, terminated, truncated, _info) = env.step(a)
done = np.logical_or(terminated, truncated) # here use the logical or, one can use terminal
total_rew += reward
if render and t % 3 == 0: env.render()
if done: break
return total_rew, t + 1
if __name__ == '__main__':
# logger.set_level(logger.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--display', action='store_true')
parser.add_argument('target', nargs="?", default="CartPole-v0")
args = parser.parse_args()
env = gym.make(args.target, render_mode='human')
np.random.seed(0)
params = dict(n_iter=100, batch_size=10, elite_frac=0.2)
num_steps = 200
def noisy_evaluation(theta):
agent = BinaryActionLinearPolicy(theta)
rew, T = do_rollout(agent, env, num_steps)
return rew
# Train the agent, and snapshot each stage
for (i, iterdata) in enumerate(cem(noisy_evaluation, np.zeros(env.observation_space.shape[0] + 1), **params)):
print('Iteration %2i. Episode mean reward: %7.3f' % (i, iterdata['y_mean']))
agent = BinaryActionLinearPolicy(iterdata['theta_mean'])
do_rollout(agent, env, 200, render=True)
env.close()