-
Notifications
You must be signed in to change notification settings - Fork 0
/
lstm.py
148 lines (128 loc) · 5.57 KB
/
lstm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from __future__ import absolute_import, division, print_function
# Import TensorFlow v2.
import tensorflow as tf
from tensorflow.keras import Model, layers
import numpy as np
import random
# Dataset parameters.
num_classes = 2 # linear sequence or not.
seq_max_len = 20 # Maximum sequence length.
seq_min_len = 5 # Minimum sequence length (before padding).
masking_val = -1 # -1 will represents the mask and be used to pad sequences to a common max length.
max_value = 10000 # Maximum int value.
# Training Parameters
learning_rate = 0.001
training_steps = 2000
batch_size = 64
display_step = 100
# Network Parameters
num_units = 32 # number of neurons for the LSTM layer.
# ====================
# TOY DATA GENERATOR
# ====================
def toy_sequence_data():
""" Generate sequence of data with dynamic length.
This function generates toy samples for training:
- Class 0: linear sequences (i.e. [1, 2, 3, 4, ...])
- Class 1: random sequences (i.e. [9, 3, 10, 7,...])
NOTICE:
We have to pad each sequence to reach 'seq_max_len' for TensorFlow
consistency (we cannot feed a numpy array with inconsistent
dimensions). The dynamic calculation will then be perform and ignore
the masked value (here -1).
"""
while True:
# Set variable sequence length.
seq_len = random.randint(seq_min_len, seq_max_len)
rand_start = random.randint(0, max_value - seq_len)
# Add a random or linear int sequence (50% prob).
if random.random() < .5:
# Generate a linear sequence.
seq = np.arange(start=rand_start, stop=rand_start+seq_len)
# Rescale values to [0., 1.].
seq = seq / max_value
# Pad sequence until the maximum length for dimension consistency.
# Masking value: -1.
seq = np.pad(seq, mode='constant', pad_width=(0, seq_max_len-seq_len), constant_values=masking_val)
label = 0
else:
# Generate a random sequence.
seq = np.random.randint(max_value, size=seq_len)
# Rescale values to [0., 1.].
seq = seq / max_value
# Pad sequence until the maximum length for dimension consistency.
# Masking value: -1.
seq = np.pad(seq, mode='constant', pad_width=(0, seq_max_len-seq_len), constant_values=masking_val)
label = 1
yield np.array(seq, dtype=np.float32), np.array(label, dtype=np.float32)
# Use tf.data API to shuffle and batch data.
train_data = tf.data.Dataset.from_generator(toy_sequence_data, output_types=(tf.float32, tf.float32))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)
# Create LSTM Model.
class LSTM(Model):
# Set layers.
def __init__(self):
super(LSTM, self).__init__()
# Define a Masking Layer with -1 as mask.
self.masking = layers.Masking(mask_value=masking_val)
# Define a LSTM layer to be applied over the Masking layer.
# Dynamic computation will automatically be performed to ignore -1 values.
self.lstm = layers.LSTM(units=num_units)
# Output fully connected layer (2 classes: linear or random seq).
self.out = layers.Dense(num_classes)
# Set forward pass.
def call(self, x, is_training=False):
# A RNN Layer expects a 3-dim input (batch_size, seq_len, num_features).
x = tf.reshape(x, shape=[-1, seq_max_len, 1])
# Apply Masking layer.
x = self.masking(x)
# Apply LSTM layer.
x = self.lstm(x)
# Apply output layer.
x = self.out(x)
if not is_training:
# tf cross entropy expect logits without softmax, so only
# apply softmax when not training.
x = tf.nn.softmax(x)
return x
# Build LSTM model.
lstm_net = LSTM()
# Cross-Entropy Loss.
# Note that this will apply 'softmax' to the logits.
def cross_entropy_loss(x, y):
# Convert labels to int 64 for tf cross-entropy function.
y = tf.cast(y, tf.int64)
# Apply softmax to logits and compute cross-entropy.
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=x)
# Average loss across the batch.
return tf.reduce_mean(loss)
# Accuracy metric.
def accuracy(y_pred, y_true):
# Predicted class is the index of highest score in prediction vector (i.e. argmax).
correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis=-1)
# Adam optimizer.
optimizer = tf.optimizers.Adam(learning_rate)
# Optimization process.
def run_optimization(x, y):
# Wrap computation inside a GradientTape for automatic differentiation.
with tf.GradientTape() as g:
# Forward pass.
pred = lstm_net(x, is_training=True)
# Compute loss.
loss = cross_entropy_loss(pred, y)
# Variables to update, i.e. trainable variables.
trainable_variables = lstm_net.trainable_variables
# Compute gradients.
gradients = g.gradient(loss, trainable_variables)
# Update weights following gradients.
optimizer.apply_gradients(zip(gradients, trainable_variables))
# Run training for the given number of steps.
for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
# Run the optimization to update W and b values.
run_optimization(batch_x, batch_y)
if step % display_step == 0 or step == 1:
pred = lstm_net(batch_x, is_training=True)
loss = cross_entropy_loss(pred, batch_y)
acc = accuracy(pred, batch_y)
print("step: %i, loss: %f, accuracy: %f" % (step, loss, acc))