forked from benjaminbenteke/Deep_RL_Project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDQN.py
128 lines (96 loc) · 3.81 KB
/
DQN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import torch
import copy
from collections import deque
import numpy as np
import random
import torch.nn as nn
from config import args
from model import Model
import torch.nn.functional as F
class DQNAgent():
"""
Class that defines the functions required for training the DQN agent
"""
def __init__(self, n_states, n_actions, batch_size= args.batch_size, gamma= args.gamma, eps_max= args.eps_max, lr= args.lr, N= args.N, eps_end= args.eps_end, eps_decay= args.eps_decay):
self.gamma= gamma
self.n_states= n_states
self.n_actions= n_actions
# for epsilon-greedy exploration strategy
self.eps_max= eps_max
self.eps_decay= eps_decay
self.eps_end= eps_end
self.lr= lr
self.memory= N
self.batch_size= batch_size
# instances of the network for current policy and its target
self.q_net= Model(self.n_states, self.n_actions)
self.tg_net= copy.deepcopy(self.q_net)
self.criteria = torch.nn.MSELoss()
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=self.lr)
# instance of the replay buffer
self.buffer = deque(maxlen=self.memory)
def insert(self, state, action, reward, next_state, done):
"""
Parameters
----
s: float
State value
a: int
Action value
r: float
reward value
s': float
next state value
device: str
Name of the device (cuda or cpu) on which the computations would be performed
sars= (s, a, r, s') this is a transition that will be store in the Buffer.
done is a bool variable, that tells us whether a state is termial or not.
"""
state= np.expand_dims(state, 0)
next_state= np.expand_dims(next_state, 0)
self.buffer.append((state, action, reward, next_state, done))
def buffer_size(self):
"""
This function return the size of the Replay Buffer.
"""
return len(self.buffer)
def sample_Buffer(self, m):
"""
Function to pick 'm' samples from the memory that are selected uniformly at random, such that m = batchsize
Parameters
---
batchsize: int
Number of elements to randomly sample from the memory in each batch
device: str
Name of the device (cuda or cpu) on which the computations would be performed
Returns
---
Tensors representing a batch of transitions sampled from the memory
"""
state, action, reward, next_state, done = zip(*random.sample(self.buffer, m))
return np.concatenate(state), action, reward, np.concatenate(next_state), done
def train(self):
s_batch, a_batch, r_batch, s_n_batch, done_bacth = self.sample_Buffer(self.batch_size)
s_batch= args.Variable(torch.FloatTensor(np.float32(s_batch)))
a_batch= args.Variable(torch.LongTensor(a_batch))
s_n_batch= args.Variable(torch.FloatTensor(np.float32(s_n_batch)), volatile=True)
r_batch= args.Variable(torch.FloatTensor(r_batch))
done_bacth= args.Variable(torch.FloatTensor(done_bacth))
## We
q_values= self.q_net.policy(s_batch)
# We Compute the expected Q values by using target Network
next_q_values= self.tg_net.policy(s_n_batch)
q_value= q_values.gather(1, a_batch.unsqueeze(1)).squeeze(1)
next_q_value= next_q_values.max(1)[0]
expected_q_value= r_batch + self.gamma * next_q_value * (1 - done_bacth) # Target from Bellmann Equation
## Loss Computation
# loss= (q_value - args.Variable(expected_q_value.data)).pow(2).mean()
# loss= agent.criteria(q_value, expected_q_value).mean()
loss= F.smooth_l1_loss(q_value, expected_q_value).mean()
## ---- Optimization step ------- ##
self.optimizer.zero_grad()
loss.backward(retain_graph=True)
#nn.utils.clip_grad_norm(self.q_net.parameters(), args.grad_clip)
self.optimizer.step()
## ---- Optimization step ------- ##
return loss