-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathqlearning_gym_3.py
108 lines (80 loc) · 3.49 KB
/
qlearning_gym_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import gym
import numpy as np
import matplotlib.pyplot as plt
env = gym.make("MountainCar-v0")
env.reset()
LEARNING_RATE = 0.1
# discount is weight
# measure of how important we find future actions
# how much we value future reward over current reward
DISCOUNT = 0.95
EPISODES = 20000
SHOW_EVERY = 500
STATS_EVERY = 100
DISCRETE_OS_SIZE = [20] * len(env.observation_space.high)
discrete_os_win_size = (env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE
# chance of doing random action
epsilon = 1.0 # not a constant, qoing to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES // 2
# amount to decay each episode
epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING)
q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))
ep_rewards = []
aggr_ep_rewards = {'ep': [], 'avg': [], 'max': [], 'min': []}
def get_discrete_state(state):
discrete_state = (state - env.observation_space.low)/discrete_os_win_size
# we use this tuple to look up the 3 Q values for the available actions in the q-table
return tuple(discrete_state.astype(np.int))
for episode in range(EPISODES):
episode_reward = 0
discrete_state = get_discrete_state(env.reset())
done = False
if episode % SHOW_EVERY == 0:
render = True
print(episode)
else:
render = False
while not done:
if np.random.random() > epsilon:
# Get action from Q table
action = np.argmax(q_table[discrete_state])
else:
# Get random action
action = np.random.randint(0, env.action_space.n)
new_state, reward, done, _ = env.step(action)
episode_reward += reward
new_discrete_state = get_discrete_state(new_state)
if render:
env.render()
if not done:
# Maximum possible Q value in next step (for new state)
max_future_q = np.max(q_table[new_discrete_state])
# Current Q value (for current state and performed action)
current_q = q_table[discrete_state + (action,)]
# equation for a new Q value for current state and action
new_q = (1 - LEARNING_RATE) * current_q + \
LEARNING_RATE * (reward + DISCOUNT * max_future_q)
# Update Q table with new Q value
q_table[discrete_state + (action,)] = new_q
# Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
elif new_state[0] >= env.goal_position:
print(f'Made it on episode {episode}')
q_table[discrete_state + (action,)] = 0
discrete_sate = new_discrete_state
if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
epsilon -= epsilon_decay_value
ep_rewards.append(episode_reward)
if not episode % STATS_EVERY:
#np.save(f"qtables/{episode}-qtable.npy", q_table)
average_reward = sum(ep_rewards[-STATS_EVERY:])/STATS_EVERY
aggr_ep_rewards['ep'].append(episode)
aggr_ep_rewards['avg'].append(average_reward)
aggr_ep_rewards['max'].append(max(ep_rewards[-STATS_EVERY:]))
aggr_ep_rewards['min'].append(min(ep_rewards[-STATS_EVERY:]))
env.close()
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="max rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="min rewards")
plt.legend(loc=4)
plt.show()