-
Notifications
You must be signed in to change notification settings - Fork 0
/
CNNet.py
261 lines (195 loc) · 9.44 KB
/
CNNet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import tensorflow as tf
import numpy as np
import time
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
relu = tf.nn.relu
def weight_variable(shape, name=None):
initial = tf.truncated_normal(shape,
stddev=tf.sqrt(2./(shape[0]+shape[1])))
return tf.Variable(initial, name=name)
def bias_variable(shape, name=None):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial, name=name)
def conv2d(x, W,pad='SAME', strides=1):
"""2D Convolutional operation. Default stride of 1"""
return tf.nn.conv2d(x, W,
strides=[1, strides, strides, 1], padding=pad)
def ReparamTrickBN(W, b, beta, gamma, mu, sigma2):
W_rep = W * gamma / np.sqrt(sigma2)
b_rep = (b - mu) * gamma / np.sqrt(sigma2) + beta
return W_rep, b_rep
class CNNet(object):
def __init__(self, name, lr = 0.001,
activationFunc = relu,
flag_bNorm=True,
loadInstance = None):
# ---------- RESET GRAPH ----------
tf.reset_default_graph()
# ---------- OUTPUT FOLDERS ----------
self.logsFolder = 'log_files/' # useful for tensorboard logs
self.saveFolder = 'Models/' # useful to restore the model
# ---------- ATTRIBUTES ----------
self.name = name
self.lr = lr
self.mean = None
if(flag_bNorm):
self.act = lambda net: activationFunc(self.batchNorm(net))
else:
self.act = activationFunc
# ---------- DATA PLACEHOLDERS ----------
with tf.variable_scope('Input'):
# tf Graph Input: mnist data image of shape 28*28*1
self.X = tf.placeholder(tf.float32, [None,784], name='X')
# 0-9 digits recognition, 10 classes
self.y = tf.placeholder(tf.float32, [None,10], name='y')
# Dropout
self.is_training = tf.placeholder(tf.bool, name='flag_train')
# ---------- GRAPH DEFINITION ----------
with tf.name_scope('Model'):
self.CNN_var_init()
self.logits = self.CNN_feedforward()
self.prob = tf.nn.softmax(self.logits)
with tf.name_scope('Loss'):
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
logits=self.logits, labels=self.y))
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, scope=name)
with tf.control_dependencies(update_ops):
self.train_op = tf.train.AdamOptimizer(self.lr).minimize(self.loss, name="train_op")
with tf.name_scope('Accuracy'):
self.pred = tf.argmax(self.logits, axis=1, name="prediction")
self.accuracy = tf.reduce_mean(tf.cast(
tf.equal(self.pred, tf.argmax(self.y, axis=1)), tf.float32))
# ---------- TRACK BATCH LOSS AND ACCURACY ----------
s1 = tf.summary.scalar("loss", self.loss)
s2 = tf.summary.scalar("acc", self.accuracy)
self.summary_op = tf.summary.merge_all()
# SESSION INITIALIZATION & RESTORE
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
self.saver = tf.train.Saver()
if (loadInstance):
self.restore(loadInstance)
def __del__(self):
self.sess.close()
# ----------------------------------------------------------
# ----------------------INITIALIZATION----------------------
# ----------------------------------------------------------
def CNN_var_init(self):
# Convolutional layer 1
self.W_c1 = weight_variable([5,5,1,20], 'W_c1') #[5,5,1,20]
self.b_c1 = bias_variable([20], 'b_c1')#[20]
# Fully connected layer 1
self.W_fc1 = weight_variable([15680,1000], 'W_fc1') #[1000,10]
self.b_fc1 = bias_variable([1000], 'b_fc1')#[10]
self.W_fc2 = weight_variable([1000,10], 'W_fc2') #[1000,10]
self.b_fc2 = bias_variable([10], 'b_fc2')#[10]
# ----------------------------------------------------------
# ---------------------DATA PREPROCESSING-------------------
# ----------------------------------------------------------
def preproc(self, x, mean=None):
"""Preprocessing Input data by substracting mean"""
# x = x*2 - 1.0
# per-example mean subtraction (http://ufldl.stanford.edu/wiki/index.php/Data_Preprocessing)
if(not(mean)):
mean = tf.reduce_mean(x, axis=1, keepdims=True)
return x - mean
# ----------------------------------------------------------
# ------------------------FEEDFORWARD-----------------------
# ----------------------------------------------------------
def batchNorm(self, net, decay=0.99):
"""Batch Normalization Layer from Tensorflow"""
net = tf.contrib.layers.batch_norm(net,
decay=decay,
scale=True,
updates_collections=None,
is_training=self.is_training)
return net
def CNN_feedforward(self):
"""Feedforward Architecture, classifying images"""
from tensorflow.contrib.layers import flatten
net = self.preproc(tf.reshape(self.X,[-1,28,28,1]))
# Convolutional layer 1 & max pooling
with tf.name_scope('Conv1'):
net = conv2d(net, self.W_c1)+ self.b_c1
net = self.act(net)
# Flattening
net = flatten(net)
# Fully connected layer 1, sigmoid activation
with tf.name_scope('FC1'):
net = tf.matmul(net, self.W_fc1) + self.b_fc1
net = self.act(net)
# Fully connected layer 1, softmax activation
with tf.name_scope('FC2'):
net = tf.matmul(net, self.W_fc2) + self.b_fc2
return net
# ----------------------------------------------------------
# -----------------------SAVE & RESTORE---------------------
# ----------------------------------------------------------
def save(self, path=None):
"""Save Tensorflow Model"""
if(path is None):
path = self.saveFolder+self.name
save_path = self.saver.save(self.sess, path)
print("INFO: TF Model saved in file: %s" % path)
def restore(self, path=None):
"""Restore TF model from the file"""
if(path is None):
path = self.saveFolder+self.name
self.saver.restore(self.sess, save_path=path)
# ----------------------------------------------------------
# --------------------------LEARNING------------------------
# ----------------------------------------------------------
def train(self, batch_size=128, epochs=100,
display_step=1, saveBest=True):
"""Train the model going through MNIST train dataset #epochs times"""
print(self.name)
print("* START TRAIN {l_r: %.4f; epochs: %d; batch: %d, TrAcc: %.1f %%, ValAcc: %.1f %%}"%\
(self.lr, epochs, batch_size, 100*self.benchmark('TRAINING'),
100*self.benchmark('VALIDATION')))
# op to write logs to Tensorboard
sum_wr = tf.summary.FileWriter(self.logsFolder, self.sess.graph)
# Training cycle
t0 = time.time()
total_batch = mnist.train.num_examples // batch_size
for epoch in range(epochs):
avg_cost = 0.
# Loop over all batches
for i in range(total_batch):
batch_xs, batch_ys = mnist.train.next_batch(batch_size)
_, c, summary = self.sess.run(
[self.train_op, self.loss, self.summary_op],
feed_dict={self.X: batch_xs, self.y: batch_ys,
self.is_training: True})
sum_wr.add_summary(summary, epoch * total_batch + i)
avg_cost += c / total_batch
# Print training results per epoch
vl_acc = self.benchmark('VALIDATION')
print(" [%.1f] Epoch: %02d | Loss=%.9f | ValAcc=%.3f"%(
time.time()-t0, epoch+1, avg_cost, (vl_acc*100)))
# Evaluating model with the test accuracy
print ("* END TRAIN in %.1f seconds => TestAcc: %.3f"%(
time.time()-t0, 100*self.benchmark('TEST')))
# ----------------------------------------------------------
# --------------------------TESTING-------------------------
# ----------------------------------------------------------
def benchmark(self, dataset='TEST', batch_size = 1000):
if (dataset=='TRAINING'):
benchmark_data = mnist.train
elif (dataset=='VALIDATION'):
benchmark_data = mnist.validation
else:#(dataset=='TEST'):
benchmark_data = mnist.test
N = benchmark_data.num_examples // batch_size
total_acc = 0
# op to write logs to Tensorboard
sum_wr = tf.summary.FileWriter(self.logsFolder, self.sess.graph)
for batch_i in range(N):
xs, ys = benchmark_data.next_batch(batch_size, shuffle=False)
step_acc, summ = self.sess.run([self.accuracy, self.summary_op],
{self.X: xs, self.y: ys, self.is_training: False})
total_acc+= step_acc
sum_wr.add_summary(summ, batch_i)
return total_acc / N
def predict(self, image):
return self.sess.run(self.pred,{self.X: image, self.is_training: False})