-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq-learning.py
270 lines (233 loc) · 14.5 KB
/
q-learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import random
import math
from EnvMap import EnvMap
from plot import plot_line, compare_lines_lat, mean_squared_error
from Simulator import Simulator
random.seed(42)
class QLearning:
def __init__(self, num_grid_rows=3, num_grid_cols=3, num_crates=3, obstacles={}, default_q_value=50.0):
self.num_grid_rows = num_grid_rows
self.num_grid_cols = num_grid_cols
self.num_crates = num_crates
self.slidingMap = EnvMap(rows=self.num_grid_rows, cols=self.num_grid_cols)
self.init_pitchfork_distribution = {'p1': {(0, 0): 0.2, (1, 0): 0.2, (2, 0): 0.2, (3, 0): 0 if self.num_grid_rows < 4 else 0.2}}
self.alpha = 0.9
self.epsilon = 0.0
self.discount_param = 1
self.default_q_value = default_q_value
self.slidingMap.MOVE_REWARD = -1
self.slidingMap.GOAL_REWARD = 10
self.init_board = {
'obstacles': obstacles,
'goals': {'g1': (1, self.num_grid_cols-1)},
}
self.q_values = {}
self.get_initial_state()
self.initialise_q_values()
def initialise_q_values(self):
for state in self.slidingMap.states:
for action in self.slidingMap.actions:
self.q_values[(state, action)] = self.default_q_value
def get_initial_state(self):
pitchfork_state_map = {}
for entity in self.init_pitchfork_distribution.keys():
positions = list(self.init_pitchfork_distribution[entity].keys())
probabilities = list(self.init_pitchfork_distribution[entity].values())
chosen_position = random.choices(positions, probabilities)[0]
pitchfork_state_map[entity] = chosen_position
crate_state_map = {}
allowed_spots = random.sample([(row, col) for row in range(0, self.num_grid_rows) for col in range(1, self.num_grid_cols) if (row, col) not in self.init_board['obstacles'].values()], self.num_crates)
for i in range(0, self.num_crates):
crate_id = 'c' + str(i)
crate_state_map[crate_id] = allowed_spots[i]
self.slidingMap.setup_board(self.init_board['obstacles'], crate_state_map, pitchfork_state_map, self.init_board['goals'])
return self.slidingMap.getCurrentState()
def get_epsilon_greedy_action_probabilities(self, current_state, epsilon=0.0):
highest_q_value = -math.inf
probs = []
best_actions_idx = []
base_prob = epsilon / len(self.slidingMap.actions)
for idx, action in enumerate(self.slidingMap.actions):
probs.append(base_prob)
q_value = self.q_values[(current_state, action)]
if q_value > highest_q_value:
highest_q_value = q_value
best_actions_idx = [idx]
elif q_value == highest_q_value:
best_actions_idx.append(idx)
for best_action_idx in best_actions_idx:
probs[best_action_idx] = (1 - epsilon)/len(best_actions_idx) + base_prob
return probs
def get_next_action(self, current_state, epsilon):
return random.choices(self.slidingMap.actions, weights=self.get_epsilon_greedy_action_probabilities(current_state, epsilon))[0]
def get_next_state(self, _current_state, current_action):
return self.slidingMap.computeNextState(current_action)
def get_reward(self, _current_state, _current_action, _next_state):
return self.slidingMap.computeReward()
def get_greedy_policy(self):
policy = {}
for state in self.slidingMap.states:
policy[(state, self.get_next_action(state, 0.0))] = 1.0
return policy
def run_episode(self, max_episode_length=None):
current_state = self.get_initial_state()
episode_length = 0
cumulative_reward = 0
while True:
if self.slidingMap.isTerminalState() or (max_episode_length is not None and episode_length == max_episode_length):
break
episode_length += 1
current_action = self.get_next_action(current_state, self.epsilon)
next_state = self.get_next_state(current_state, current_action)
reward = self.get_reward(current_state, current_action, next_state)
cumulative_reward += reward
best_next_action = self.get_next_action(next_state, 0.0) # using a greedy policy here
current_q_value = self.q_values[(current_state, current_action)]
next_q_value = self.q_values[(next_state, best_next_action)]
self.q_values[(current_state, current_action)] = current_q_value + self.alpha*(reward + self.discount_param*next_q_value - current_q_value)
current_state = next_state
return episode_length, cumulative_reward
def run_trial(qLearner, descriptor="", maxEpisodeCount=500, maxEpisodeLength=500, show_results=False):
q_values_hist = []
episode_length_hist = []
trajectory_history = []
rewards_hist = []
print("[Episode 1] Running...")
qLearner.epsilon = 1.0
episode_length, cumulative_reward = qLearner.run_episode(maxEpisodeLength)
episode_length_hist.append(episode_length)
rewards_hist.append(cumulative_reward)
q_values_hist.append(qLearner.q_values.copy())
trajectory_history.append(qLearner.slidingMap.trajectory)
if show_results:
qLearner.slidingMap.simulateTrajectory(100, 2500)
for episode_idx in range(1, maxEpisodeCount):
print(f"[Episode {episode_idx + 1}] Running...")
qLearner.epsilon = 1/(episode_idx+1)
episode_length, cumulative_reward = qLearner.run_episode(maxEpisodeLength)
episode_length_hist.append(episode_length)
rewards_hist.append(cumulative_reward)
q_values_hist.append(qLearner.q_values.copy())
trajectory_history.append(qLearner.slidingMap.trajectory)
cumulative_episode_lengths = [0]
for episode_length in episode_length_hist:
cumulative_episode_lengths.append(cumulative_episode_lengths[-1] + episode_length)
if show_results:
plot_line(cumulative_episode_lengths, range(0, maxEpisodeCount+1), "Timesteps", "Episode Counts", f"Q-Learning - Timesteps vs Episode Counts ({descriptor})", show_results)
deviation_over_episodes = []
for idx in range(1, len(q_values_hist)):
deviation_over_episodes.append(mean_squared_error(q_values_hist[idx], q_values_hist[idx-1]))
if show_results:
qLearner.slidingMap.simulateTrajectory(100, 2500)
return cumulative_episode_lengths, deviation_over_episodes, rewards_hist
# qLearner = QLearning()
# run_trial(qLearner, "basic", show_results=True)
# sim = Simulator(qLearner.slidingMap)
# sim.simulate(qLearner.get_greedy_policy())
# sim.visualize(500, 2000)
# qLearner = QLearning(4, 4, 2)
# run_trial(qLearner, "basic - large and simple", maxEpisodeCount=1000, show_results=True)
# qLearner = QLearning(num_crates=4)
# run_trial(qLearner, "basic - complex", maxEpisodeCount=800, show_results=True)
# qLearner = QLearning(obstacles={'o1': (1, 1)})
# run_trial(qLearner, "basic - obstacle", maxEpisodeCount=1000, show_results=True)
# ## Basic Obstacle Experiment - for Alpha ##
# MAX_EPISODES_COUNTS = 1000
# alpha_candidates = [1, 9e-1, 5e-1, 1e-1, 5e-2, 1e-2, 5e-3, 1e-3, 5e-4]
# MAX_TRIALS = len(alpha_candidates)
# MAX_SAMPLES = 10
# cumulative_episode_lengths_by_alpha = []
# for trial_idx in range(0, MAX_TRIALS):
# cumulative_episode_lengths_by_sample = []
# for sample_idx in range(0, MAX_SAMPLES):
# print(f'[Sample {sample_idx}]')
# qLearner = QLearning(obstacles={'o1': (1, 1)})
# qLearner.alpha = alpha_candidates[trial_idx]
# cumulative_episode_lengths, deviation_over_episodes = run_trial(qLearner, "basic - obstacle", maxEpisodeCount=MAX_EPISODES_COUNTS)
# cumulative_episode_lengths_by_sample.append(cumulative_episode_lengths)
# mean_cumulative_episode_lengths = []
# for episode_idx in range(0, MAX_EPISODES_COUNTS+1):
# mean_cumulative_episode_lengths.append(sum([cumulative_episode_lengths[episode_idx] for cumulative_episode_lengths in cumulative_episode_lengths_by_sample])/float(MAX_SAMPLES))
# cumulative_episode_lengths_by_alpha.append(mean_cumulative_episode_lengths)
# compare_lines_lat(cumulative_episode_lengths_by_alpha, range(0, MAX_EPISODES_COUNTS+1), "Timesteps", "Episode Counts", "Q Learning - Timesteps vs Episode Counts - Obstacle - Alpha Variations", alpha_candidates, True)
# # ## Basic Obstacle Experiment - for Default Q_Value ##
# MAX_EPISODES_COUNTS = 1000
# default_q_vals_candidates = [0.0, 50, -50, 5e3, -5e3, 5e5, -5e5]
# MAX_TRIALS = len(default_q_vals_candidates)
# MAX_SAMPLES = 10
# cumulative_episode_lengths_by_q_vals = []
# for trial_idx in range(0, MAX_TRIALS):
# cumulative_episode_lengths_by_sample = []
# for sample_idx in range(0, MAX_SAMPLES):
# print(f'[Sample {sample_idx}]')
# qLearner = QLearning(obstacles={'o1': (1, 1)}, default_q_value=default_q_vals_candidates[trial_idx])
# cumulative_episode_lengths, deviation_over_episodes = run_trial(qLearner, "basic - obstacle", maxEpisodeCount=MAX_EPISODES_COUNTS)
# cumulative_episode_lengths_by_sample.append(cumulative_episode_lengths)
# mean_cumulative_episode_lengths = []
# for episode_idx in range(0, MAX_EPISODES_COUNTS+1):
# mean_cumulative_episode_lengths.append(sum([cumulative_episode_lengths[episode_idx] for cumulative_episode_lengths in cumulative_episode_lengths_by_sample])/float(MAX_SAMPLES))
# cumulative_episode_lengths_by_q_vals.append(mean_cumulative_episode_lengths)
# compare_lines_lat(cumulative_episode_lengths_by_q_vals, range(0, MAX_EPISODES_COUNTS+1), "Timesteps", "Episode Counts", "Q Learning - Timesteps vs Episode Counts - Obstacle - Default Q Val Variations", default_q_vals_candidates, True)
# # ## Basic Obstacle Experiment - for Reward Functions ##
# MAX_EPISODES_COUNTS = 1000
# reward_spec_candidates = [(-1, 0), (0, 10), (-1, 10), (-5, 10)]
# MAX_TRIALS = len(reward_spec_candidates)
# MAX_SAMPLES = 10
# cumulative_episode_lengths_by_reward_spec = []
# for trial_idx in range(0, MAX_TRIALS):
# cumulative_episode_lengths_by_sample = []
# for sample_idx in range(0, MAX_SAMPLES):
# print(f'[Sample {sample_idx}]')
# qLearner = QLearning(obstacles={'o1': (1, 1)})
# qLearner.slidingMap.MOVE_REWARD = reward_spec_candidates[trial_idx][0]
# qLearner.slidingMap.GOAL_REWARD = reward_spec_candidates[trial_idx][1]
# cumulative_episode_lengths, deviation_over_episodes = run_trial(qLearner, "basic - obstacle", maxEpisodeCount=MAX_EPISODES_COUNTS)
# cumulative_episode_lengths_by_sample.append(cumulative_episode_lengths)
# mean_cumulative_episode_lengths = []
# for episode_idx in range(0, MAX_EPISODES_COUNTS+1):
# mean_cumulative_episode_lengths.append(sum([cumulative_episode_lengths[episode_idx] for cumulative_episode_lengths in cumulative_episode_lengths_by_sample])/float(MAX_SAMPLES))
# cumulative_episode_lengths_by_reward_spec.append(mean_cumulative_episode_lengths)
# compare_lines_lat(cumulative_episode_lengths_by_reward_spec, range(0, MAX_EPISODES_COUNTS+1), "Timesteps", "Episode Counts", "Q Learning - Timesteps vs Episode Counts - Obstacle - Reward Spec Variations", [f"{spec[0]}_{spec[1]}" for spec in reward_spec_candidates], True)
# # ## Basic Obstacle Experiment - for Discount Parameter Functions ##
# MAX_EPISODES_COUNTS = 1000
# discount_param_candidates = [1, 0.9, 0.75, 0.5, 0.25, 0.0]
# MAX_TRIALS = len(discount_param_candidates)
# MAX_SAMPLES = 10
# cumulative_episode_lengths_by_discount_param = []
# for trial_idx in range(0, MAX_TRIALS):
# cumulative_episode_lengths_by_sample = []
# for sample_idx in range(0, MAX_SAMPLES):
# print(f'[Sample {sample_idx}]')
# qLearner = QLearning(obstacles={'o1': (1, 1)})
# qLearner.discount_param = discount_param_candidates[trial_idx]
# cumulative_episode_lengths, deviation_over_episodes = run_trial(qLearner, "basic - obstacle", maxEpisodeCount=MAX_EPISODES_COUNTS)
# cumulative_episode_lengths_by_sample.append(cumulative_episode_lengths)
# mean_cumulative_episode_lengths = []
# for episode_idx in range(0, MAX_EPISODES_COUNTS+1):
# mean_cumulative_episode_lengths.append(sum([cumulative_episode_lengths[episode_idx] for cumulative_episode_lengths in cumulative_episode_lengths_by_sample])/float(MAX_SAMPLES))
# cumulative_episode_lengths_by_discount_param.append(mean_cumulative_episode_lengths)
# compare_lines_lat(cumulative_episode_lengths_by_discount_param, range(0, MAX_EPISODES_COUNTS+1), "Timesteps", "Episode Counts", "Q Learning - Timesteps vs Episode Counts - Obstacle - Discount Param Variations", discount_param_candidates, True)
# # Basic Obstacle Experiment ##
# MAX_SAMPLES = 20
# MAX_EPISODES_COUNTS = 1000
# cumulative_episode_lengths_over_samples = []
# deviation_between_episodes_over_samples = []
# rewards_hist_over_samples = []
# for sample_idx in range(0, MAX_SAMPLES):
# print(f'[Sample {sample_idx}]')
# qLearner = QLearning(obstacles={'o1': (1, 1)})
# cumulative_episode_lengths, deviation_over_episodes, rewards_hist = run_trial(qLearner, "basic - obstacle", maxEpisodeCount=MAX_EPISODES_COUNTS)
# cumulative_episode_lengths_over_samples.append(cumulative_episode_lengths)
# deviation_between_episodes_over_samples.append(deviation_over_episodes)
# rewards_hist_over_samples.append(rewards_hist)
# mean_cumulative_episode_lengths = []
# mean_deviation_between_samples = []
# mean_rewards_hist = []
# for episode_idx in range(0, MAX_EPISODES_COUNTS):
# mean_cumulative_episode_lengths.append(sum([cumulative_episode_lengths[episode_idx] for cumulative_episode_lengths in cumulative_episode_lengths_over_samples])/float(MAX_SAMPLES))
# mean_rewards_hist.append(sum([rewards_hist[episode_idx] for rewards_hist in rewards_hist_over_samples])/float(MAX_SAMPLES))
# for episode_idx in range(0, MAX_EPISODES_COUNTS-1):
# mean_deviation_between_samples.append(sum([deviation_over_episodes[episode_idx] for deviation_over_episodes in deviation_between_episodes_over_samples])/float(MAX_SAMPLES))
# plot_line(mean_cumulative_episode_lengths, range(0, MAX_EPISODES_COUNTS), "Timesteps", "Episode Counts", "Q Learning - Cumulative timesteps vs Episode Counts - Obstacle", True)
# plot_line(range(1, MAX_EPISODES_COUNTS), mean_deviation_between_samples, "Episode Counts", "MSE Difference from previous epsiode q values", "Q Learning - MSE difference from previous Q Values - Obstacle", True)
# plot_line(range(0, MAX_EPISODES_COUNTS), mean_rewards_hist, "Episode Counts", "Average Rewards", "Q Learning - Average Rewards - Obstacle", True)