forked from StefOe/indrnn-pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
addition_test.py
144 lines (121 loc) · 5.18 KB
/
addition_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Module using IndRNNCell to solve the addition problem
The addition problem is stated in https://arxiv.org/abs/1803.04831. The
hyper-parameters are taken from that paper as well. The network should
converge to a MSE around zero after 1500-3000 steps.
"""
from indrnn import IndRNNv2
from indrnn import IndRNN
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import argparse
from time import time
parser = argparse.ArgumentParser(description='PyTorch IndRNN Addition test')
# Default parameters taken from https://arxiv.org/abs/1803.04831
parser.add_argument('--lr', type=float, default=0.0002,
help='learning rate (default: 0.0002)')
parser.add_argument('--time-steps', type=int, default=100,
help='length of addition problem (default: 100)')
parser.add_argument('--n-layer', type=int, default=2,
help='number of layer of IndRNN (default: 2)')
parser.add_argument('--hidden_size', type=int, default=128,
help='number of hidden units in one IndRNN layer(default: 128)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--batch-norm', action='store_true', default=False,
help='enable frame-wise batch normalization after each layer')
parser.add_argument('--bidirectional', action='store_true', default=False,
help='enable bidirectional processing')
parser.add_argument('--log-interval', type=int, default=100,
help='after how many iterations to report performance')
parser.add_argument('--model', type=str, default="IndRNN",
help='if either IndRNN or LSTM cells should be used for optimization')
# Default parameters taken from https://arxiv.org/abs/1511.06464
parser.add_argument('--batch-size', type=int, default=50,
help='input batch size for training (default: 50)')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
RECURRENT_MAX = pow(2, 1 / args.time_steps)
class Net(nn.Module):
def __init__(self, input_size, hidden_size, n_layer=2, model=IndRNN):
super(Net, self).__init__()
recurrent_inits = [lambda w: nn.init.uniform_(
w, -RECURRENT_MAX, RECURRENT_MAX)]
for _ in range(1, n_layer):
recurrent_inits.append(lambda w: nn.init.constant_(w, 1))
self.indrnn = model(
input_size, hidden_size,
n_layer, batch_norm=args.batch_norm,
bidirectional=args.bidirectional,
hidden_max_abs=RECURRENT_MAX,
recurrent_inits=recurrent_inits)
self.lin = nn.Linear(
hidden_size * 2 if args.bidirectional else hidden_size, 1)
self.lin.bias.data.fill_(.1)
self.lin.weight.data.normal_(0, .01)
def forward(self, x, hidden=None):
y, _ = self.indrnn(x, hidden)
return self.lin(y[-1]).squeeze(1)
class LSTM(nn.Module):
def __init__(self):
super(LSTM, self).__init__()
self.cell1 = nn.LSTM(2, args.hidden_size)
self.lin = nn.Linear(args.hidden_size, 1)
def forward(self, x, hidden=None):
x, hidden = self.cell1(x, hidden)
return self.lin(x[-1]).squeeze(1)
def main():
# build model
if args.model.lower() == "indrnn":
model = Net(2, args.hidden_size, args.n_layer)
elif args.model.lower() == "indrnnv2":
model = Net(2, args.hidden_size, args.n_layer, IndRNNv2)
elif args.model.lower() == "lstm":
model = LSTM()
else:
raise Exception("unsupported cell model")
if args.cuda:
model.cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
# Train the model
model.train()
step = 0
while True:
losses = []
start = time()
for _ in range(args.log_interval):
# Generate new input data
data, target = get_batch()
if args.cuda:
data, target = data.cuda(), target.cuda()
model.zero_grad()
out = model(data)
loss = F.mse_loss(out, target)
loss.backward()
optimizer.step()
losses.append(loss.item())
step += 1
print(
"MSE after {} iterations: {} ({} sec.)".format(step, np.mean(losses), time() - start))
def get_batch():
"""Generate the adding problem dataset"""
# Build the first sequence
add_values = torch.rand(
args.time_steps, args.batch_size, requires_grad=False
)
# Build the second sequence with one 1 in each half and 0s otherwise
add_indices = torch.zeros_like(add_values)
half = int(args.time_steps / 2)
for i in range(args.batch_size):
first_half = np.random.randint(half)
second_half = np.random.randint(half, args.time_steps)
add_indices[first_half, i] = 1
add_indices[second_half, i] = 1
# Zip the values and indices in a third dimension:
# inputs has the shape (time_steps, batch_size, 2)
inputs = torch.stack((add_values, add_indices), dim=-1)
targets = torch.mul(add_values, add_indices).sum(dim=0)
return inputs, targets
if __name__ == "__main__":
main()