forked from wanyuL/ReinforcementRats2021
-
Notifications
You must be signed in to change notification settings - Fork 0
/
autoencoder.py
163 lines (131 loc) · 5.04 KB
/
autoencoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import numpy as np
import matplotlib.pyplot as plt
import itertools
from tqdm.notebook import tqdm, trange
import torch
from torch.utils.data import DataLoader
nn = torch.nn
import nmastandard as nmas
class AE(nn.Module):
def __init__(self, in_dim, latent_dim, enc_lst, dec_lst):
"""
Initialize AutoEncoder
---
Parameters:
* in_dim : Number of input dimensions
* latent_dim : Size of latent space
* enc_lst : List of number of hidden nodes at each encoder layer
* dec_lst : List of number of hidden nodes at each decoder layer
"""
super(AE, self).__init__()
self.in_dim = in_dim
self.out_dim = in_dim
# Create Encoder Model
layers_a = [[nn.Linear(in_dim, enc_lst[0], bias=True), nn.ReLU()]]
layers_a += [[nn.Linear(enc_lst[idim], enc_lst[idim+1], bias=True), nn.ReLU()] for idim in range(len(enc_lst)-1)]
layers_a += [[nn.Linear(enc_lst[-1], latent_dim, bias=True)]]
enc_layers = []
for layer in layers_a:
enc_layers += layer
self.enc_model = nn.Sequential(*enc_layers)
# Create Decoder Model
layers_a = [[nn.Linear(latent_dim, dec_lst[0], bias=True), nn.ReLU()]]
layers_a += [[nn.Linear(dec_lst[idim], dec_lst[idim+1], bias=True), nn.ReLU()] for idim in range(len(dec_lst)-1)]
layers_a += [[nn.Linear(dec_lst[-1], in_dim, bias=True)]]
dec_layers = []
for layer in layers_a:
dec_layers += layer
self.dec_model = nn.Sequential(*dec_layers)
def encode(self, x):
'''
Enocdes x into the latent space
---
Parameters:
* x (torch.tensor) : The dataset to encode (size: num_examples x in_dim)
Returns:
* l (torch.tensor) : Projection into the latent space of original data (size: num_examples x latent_dim)
'''
return self.enc_model(x)
def decode(self, l):
'''
Decode l from the latent space into the initial dataset
---
Parameters:
* l (torch.tensor) : The encoded latent space representation (size: num_examples x latent_dim)
Returns:
* x (torch.tensor) : Approximation of the original dataset encoded (size: num_examples x in_dim)
'''
return self.dec_model(l)
def forward(self, x):
'''
Feed raw dataset through encoder -> decoder model in order to generate overall approximation from latent space
---
Parameters:
* x (torch.tensor) : The dataset to encode (size: num_examples x in_dim)
Returns:
* x (torch.tensor) : Approximation of the original dataset from the encoded latent space (size: num_examples x in_dim)
'''
flat_x = x.view(x.size(0), -1)
h = self.encode(flat_x)
return self.decode(h).view(x.size())
def train_autoencoder(autoencoder, dataset, device, epochs=20, batch_size=250,
seed=0):
'''
Train the provided "autoencoder" model on the provided tensor dataset.
---
Parameters:
* autoencoder (AE) : AE model to train
* dataset (torch.tensor) : The dataset to encode (size: num_examples x in_dim)
* device (str) : Device to use for training ('cuda' or 'cpu')
* epochs (int) : Number of iterations through the entire dataset on which to train
* batch_size (int) : Number of examples in randomly sampled batches to pass through the model
* seed (int) : Random seed to use for the model
Returns:
* mse_loss (torch.tensor) : List of Mean Squared Error losses by training timestep
'''
autoencoder.to(DEVICE)
optim = torch.optim.Adam(autoencoder.parameters(),
lr=1e-2,
#weight_decay=1e-5
)
loss_fn = nn.MSELoss()
g_seed = torch.Generator()
g_seed.manual_seed(seed)
loader = DataLoader(dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True,
num_workers=2,
worker_init_fn=nmas.seed_worker,
generator=g_seed)
mse_loss = torch.zeros(epochs * len(dataset) // batch_size, device=device)
i = 0
for epoch in trange(epochs, desc='Epoch'):
# print(len(list(itertools.islice(loader, 1))))
for im_batch in loader:
im_batch = im_batch.to(device)
optim.zero_grad()
reconstruction = autoencoder(im_batch)
# write the loss calculation
loss = loss_fn(reconstruction.view(batch_size, -1),
target=im_batch.view(batch_size, -1))
loss.backward()
optim.step()
mse_loss[i] = loss.detach()
i += 1
if epoch % 100 == 0:
print(mse_loss[i])
# After training completes, make sure the model is on CPU so we can easily
# do more visualizations and demos.
autoencoder.to('cpu')
return mse_loss.cpu()
if __name__ == '__main__':
SEED = 2021
nmas.set_seed(seed=SEED)
DEVICE = nmas.set_device()
x_a = torch.tensor(np.random.choice(10000, size=100000)).float()
tmp = torch.tensor(np.tile(np.arange(-1,2), (x_a.size(0),1)))
x = torch.tile(x_a.view(-1,1), [1, 3]) + tmp
vae = AE(x.size(-1), 1, [5], [5])
loss = train_autoencoder(vae, x, DEVICE, epochs=100, batch_size=250, seed=0)
plt.plot(loss)