Skip to content

Latest commit

 

History

History
1526 lines (1287 loc) · 56.9 KB

2018-12-04-卷积神经网络-coding.md

File metadata and controls

1526 lines (1287 loc) · 56.9 KB
title date categories tags mathjax
卷积神经网络-coding
2018-12-04 03:54:50 -0800
Coding
Code
CNN
Estimator
Keras
MNIST
CIFAR-10
Kaggle dog & cat
true

1. 数据集说明

1.1 多分类图像数据集-MNIST

  • 数据来源:tf.keras.datasets.mnist.load_data
  • 数据集形状:训练集60000个样本,测试集10000个样本,每个样本包括两个部分:图片数据和标签,每个样本的图片数据是一个$28 \times 28$的数组,即灰度图像,大小范围0255,标签是图片对应的数字09,数据集中每个数字出现不是均等的,训练集0: 5923, 1: 6742, 2: 5958, 3: 6131, 4: 5842, 5: 5421, 6: 5918, 7: 6265, 8: 5851, 9: 5949,测试集0: 980, 1: 1135, 2: 1032, 3: 1010, 4: 982, 5: 892, 6: 958, 7: 1028, 8: 974, 9: 1009
  • 数据集划分:在训练集中随机选出20%数据作为验证集,剩下80%用于训练,验证集用于调整超参数以及网络架构;
  • 性能度量:accuracy。

1.2 二分类图像数据集-kaggle dog & cat

  • 数据来源:Dogsvs.CatsRedux:KernelsEdition|Kaggle
  • 数据集形状:训练集25000个样本,测试集12500个样本,每个样本都是一张彩色图片,大小不定,训练集中图片名称即包含标签信息,测试集无标签,最后需要提交预测的结果到kaggle进行评分,预测结果为图片标签为dog的概率;
  • 数据集划分:在训练集中随机选出20%数据作为验证集,剩下80%用于训练,验证集用于调整超参数以及网络架构;
  • 性能度量:交叉熵或accuracy。

1.3 多分类数据集-CIFAR-10

  • 数据来源:tf.keras.datasets.cifar10.load_data
  • 数据集形状:训练集50000个样本,测试集10000个样本,每个样本包括两个部分:图片数据和标签,每个样本的图片数据是一个$32 \times 32 \times 3$的数组,即彩色图像,大小范围0255,标签是图片对应的种类09,分别对应airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck,数据集中每个种类出现是均等的;
  • 数据集划分:在训练集中随机选出20%数据作为验证集,剩下80%用于训练,验证集用于调整超参数以及网络架构;
  • 性能度量:accuracy。

2. Tensorflow卷积神经网络应用

参考:

Github-TensorFlow-Examples Tensorflow Tutorials

2.1 Eager模式实现手写数字分类

数据集使用的是Tensorflow下的MNIST

from __future__ import absolute_import, print_function, division
import tensorflow as tf

# Eager模式必须手动开启,2.0版本将会是默认,Eager模型可以边运行边观察结果
tf.enable_eager_execution()
tfe = tf.contrib.eager

print('Tensorflow version: ', tf.VERSION, '\n', 'Eager mode: ', tf.executing_eagerly())

# 设置超参数
learning_rate = 1e-4
num_steps = 20000
batch_size = 64
display_step = 100

num_classes = 10

# 获取数据
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

print('train size:', x_train.shape, y_train.shape)
print('test size:', x_test.shape, y_test.shape)

# 这里需要reshape图片,原始图片是28*28,这里转换成28*28*1,标签需要转换成onehot变量便于后面计算交叉熵,axis=-1,可以使标签转换成m*depth的形状
x_train = tf.reshape(tf.cast(x_train, tf.float32), shape=[-1, 28, 28, 1])
x_test = tf.reshape(tf.cast(x_test, tf.float32), shape=[-1, 28, 28, 1])
y_train = tf.one_hot(y_train, depth=10, axis=-1)
y_test = tf.one_hot(y_test, depth=10, axis=-1)

# 构造输入的dataset,注意Eager模式需要使用tfe调用迭代器Iterator
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(1000).batch(batch_size)
dataset_iter = tfe.Iterator(dataset)

# 定义一个CNN类,Eager模式需要继承自tfe.Network,推荐废弃,使用keras取代
class CNN(tfe.Network):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv2d_1 = self.track_layer(
            tf.layers.Conv2D(32, 5, padding='SAME', activation='relu'))
        self.conv2d_2 = self.track_layer(
            tf.layers.Conv2D(64, 5, padding='SAME', activation='relu'))
        self.maxpool = self.track_layer(
            tf.layers.MaxPooling2D(2, 2, padding='SAME'))
        self.flatten = self.track_layer(
            tf.layers.Flatten()) 
        self.fclayer = self.track_layer(
            tf.layers.Dense(1024, activation='relu'))
        self.dropout = self.track_layer(
            tf.layers.Dropout(0.5))
        self.out_layer = self.track_layer(
            tf.layers.Dense(num_classes))
    
    def call(self, x, training=True):
        x = self.conv2d_1(x)
        x = self.maxpool(x)
        x = self.conv2d_2(x)
        x = self.maxpool(x)
        x = self.flatten(x)
        x = self.fclayer(x)
        if training:
            x = self.dropout(x)
        return self.out_layer(x)

cnn = CNN()

# 定义损失函数,softmax_cross_entropy_with_logits_v2包含两步,首先计算softmax,再计算交叉熵,需要注意
def loss_fn(inference_fn, inputs, labels):
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
        logits = inference_fn(inputs), labels = labels))

# 定义计算准确率的函数,这里通过argmax取softmax中最高的那个概率的索引,也就是对应的数字
def accuracy_fn(inference_fn, inputs, labels, training):
    prediction = tf.nn.softmax(inference_fn(inputs, training))
    correct_pred = tf.equal(tf.argmax(prediction, 1), tf.argmax(labels, 1))
    return tf.reduce_mean(tf.cast(correct_pred, tf.float32))

# Eager模式的梯度计算方式implicit_gradients
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
grad = tfe.implicit_gradients(loss_fn)

average_loss = 0.
average_acc = 0.

for step in range(num_steps):
    try:
        d = dataset_iter.next()
    except StopIteration:
        dataset_iter = tfe.Iterator(dataset)
        d = dataset_iter.next()

    x_batch = d[0]
    y_batch = tf.cast(d[1], tf.int64)

    batch_loss = loss_fn(cnn, x_batch, y_batch)
    average_loss += batch_loss

    batch_accuracy = accuracy_fn(cnn, x_batch, y_batch, False)
    average_acc += batch_accuracy

    if step == 0:
        print("Initial loss= {:.6f}".format(average_loss))

    # Eager模式参数梯度下降
    optimizer.apply_gradients(grad(cnn, x_batch, y_batch))

    if (step + 1) % display_step == 0 or step == 0:
        if step > 0:
            average_loss /= display_step
            average_acc /= display_step
        print("Step:", '%04d' % (step + 1), " loss=",
              "{:.6f}".format(average_loss), " accuracy=",
              "{:.4f}".format(average_acc))
        average_loss = 0.
        average_acc = 0.
        
test_acc = accuracy_fn(cnn, x_test, y_test, False)
print('Testset accuracy: {:.4f}'.format(test_acc))

训练时间有点长

Testset accuracy: 0.9916

2.2 Keras实现手写数字分类

使用Keras一方面可以简化代码,比如神经网络模型构建过程简化,Dropout层自动判断属于train阶段还是evaluate阶段,另一方面输出结果自动显示,不需要手动print。所以,推荐使用keras或者其他自定义estimator完成机器学习任务。

from __future__ import absolute_import, print_function, division
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


print('Tensorflow version: ', tf.VERSION)

learning_rate = 1e-4
steps_per_epoch = 1000 # 一般等于 样本总数/batch_size
batch_size = 64
epochs = 20 # 训练轮数,即循环使用整个数据集的次数

num_classes = 10

# 这里缩放了图像的灰度值,为了加速收敛,为了满足model的输入形状,reshape为(28,28,1),这里不需要将标签转换为onehot类型
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train , x_test = x_train / 255. , x_test / 255.
x_train = tf.reshape(x_train, [-1, 28, 28, 1])
x_test = tf.reshape(x_test, [-1, 28, 28, 1])
print('train size:', x_train.shape, y_train.shape)
print('test size:', x_test.shape, y_test.shape)

# 构建dataset,增加了repeat,是为了可以循环使用数据集
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(1000).batch(batch_size).repeat()

# keras构建模型的方式就很简单了,直接选择需要的layer堆叠起来
model = keras.Sequential([
    # Conv2D卷积层,需要定义filters,kernel_size,strides,padding,activation,首层还需要input_shape,必须是(height, width, channel)
    layers.Conv2D(32, 5, padding='SAME', activation='relu', input_shape=(28, 28, 1)),
    # MaxPool2D池化层,需要定义pool_size,strides,padding
    layers.MaxPool2D(2, padding='SAME'),
    layers.Conv2D(64, 5, padding='SAME', activation='relu'),
    layers.MaxPool2D(2, padding='SAME'),

    layers.Flatten(),
    layers.Dense(1024, activation='relu'),
    # Dropout丢弃层,这里的0.5是丢弃率,有些函数里是保留率,需要注意
    layers.Dropout(0.5),
    layers.Dense(num_classes, activation='softmax')
])

# 设置优化器optimizer,损失函数loss,这里使用sparse_categorical_crossentropy是因为label为数字,categorical_crossentropy对应label为onehot,以及evaluate时使用的准则metrics
model.compile(
    optimizer=tf.keras.optimizers.Adam(lr=learning_rate),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)
# summary会展示模型的结构,包括每一层参数个数,每一层输入形状
model.summary()

model.fit(dataset, epochs=epochs, steps_per_epoch=steps_per_epoch)

model.evaluate(x_test, y_test, steps=1)

使用CPU训练大概一个半小时

loss: 0.0226 accuracy: 0.9925

2.3 基于Keras使用预训练的网络实现猫狗识别

首先导入需要的库,这里我们可以直接使用keras自带的已经训练好的网络,比如VGG16

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras import layers
from tensorflow.keras import Sequential 

import tensorflow as tf
import pandas as pd
import numpy as np
import os # 用于文件路径
import shutil # 用于文件复制

print('Tensorflow version: ', tf.VERSION)

然后设置一些可能会用到的超参数,这里训练集25000张图片等于batch_size * steps_per_epoch,以及图片长宽

learning_rate = 1e-4
batch_size = 20
epochs = 30
steps_per_epoch = 1250
img_height = 150
img_width = 150
img_channels = 3

为了测试代码,我们先不使用整个数据集,而是从原始数据集中划分出一小部分数据进行测试(需要修改上面的超参数),在确定代码无误后,我们需要修改这部分代码,因为训练集是完整的train文件夹

# 我们从原始的train文件夹下,分别选择猫和狗的前2000张作为训练集,之后的500作为验证集
base_dir = 'C:/Users/Admin/Downloads/dogvscat'
original_dir = os.path.join(base_dir, 'train')

# 选择的图片存放在small文件夹下
train_dir = os.path.join(base_dir, 'small_train')
eval_dir = os.path.join(base_dir, 'small_eval')
test_dir = os.path.join(base_dir, 'test')

if not os.path.exists(train_dir):
    os.mkdir(train_dir)
    os.mkdir(eval_dir)

for i in range(2500):
    name = 'cat.{}.jpg'.format(i)
    src = os.path.join(original_dir, name)
    if i < 2000:
        dst = os.path.join(train_dir, name)
    else:
        dst = os.path.join(eval_dir, name)
    shutil.copyfile(src, dst) # 复制图片

for i in range(2500):
    name = 'dog.{}.jpg'.format(i)
    src = os.path.join(original_dir, name)
    if i < 2000:
        dst = os.path.join(train_dir, name)
    else:
        dst = os.path.join(eval_dir, name)
    shutil.copyfile(src, dst)

准备输入数据,包括两部分,一个是图片绝对路径,一个是标签(dog为1,cat为0)

# 定义一个shuffle函数,使用np.random.permutation生成随机序列,保证路径与标签一一对应
def unison_shuffled_copies(a, b):
    a = np.array(a)
    b = np.array(b)
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

files = os.listdir(train_dir)
train_files = [os.path.join(train_dir, name) for name in files]
train_labels = np.array(['dog' in name for name in files]).astype(np.float)
train_files, train_labels = unison_shuffled_copies(train_files, train_labels)

files = os.listdir(eval_dir)
eval_files = [os.path.join(eval_dir, name) for name in files]
eval_labels = np.array(['dog' in name for name in files]).astype(np.float)
eval_files, eval_labels = unison_shuffled_copies(eval_files, eval_labels)

# 这个地方应该是bug,Tensorflow version 1.12.0,keras的model使用predict需要target和label,而label对于需要预测的数据来说无意义,随便设置为-1
files = ['{}.jpg'.format(i) for i in range(1, 12501)]
test_files = [os.path.join(test_dir, name) for name in files]
test_labels = np.array([-1] * len(files)).astype(np.float) # 理论上不需要这个

我们需要把图片绝对路径转换为图片数据,在输入函数中解决

def image_input_fn(filenames, labels=None, shuffle=False, repeat_count=1, batch_size=1):
    # 读取数据,解码,resize图片,归一化(这里只是除以255)
    def _read_img(filename, label=None):
        img_raw = tf.read_file(filename)
        img = tf.image.decode_image(img_raw, channels=3)
        img.set_shape([None, None, None]) # decode_image需要,decode_jpeg不需要
        img = tf.image.resize_images(img, [img_height, img_width])
        img = tf.divide(img, 255.)
        img.set_shape([img_height, img_width, img_channels])
        # 理论上测试数据集的label为空,但是keras不允许,对应上面的bug
        if label is None:
            return img
        else:
            return img, label
    if labels is None:
        dataset = tf.data.Dataset.from_tensor_slices(filenames)
    else:
        dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
    dataset = dataset.map(_read_img)
    if shuffle:
        dataset = dataset.shuffle(2000)
    dataset = dataset.batch(batch_size).repeat(repeat_count)
    return dataset

定义训练模型,我这里定义了两个注释掉的部分是使用预训练VGG16,VGG16的参数不变,仅训练连接层的参数,这样需要的内存比较小;没有注释的部分是自定义的model

# vgg16 = VGG16(
#     weights='imagenet',
#     include_top=False,
#     input_shape=(img_height, img_width, img_channels))
# model = Sequential([
#     vgg16,
#     layers.Flatten(),
#     layers.Dropout(0.5),
#     layers.Dense(1, activation='sigmoid')
# ])
# vgg16.trainable = False

model = Sequential([
    layers.Conv2D(32, 5, 2, padding='SAME', activation='relu', input_shape=(img_height, img_width, img_channels)),
    layers.MaxPool2D(strides=2, padding='SAME'),
    layers.Dropout(0.3),

    layers.Conv2D(64, 5, 2, padding='SAME', activation='relu'),
    layers.MaxPool2D(strides=2, padding='SAME'),
    layers.Dropout(0.3),
    
    layers.Conv2D(128, 5, 2, padding='SAME', activation='relu'),
    layers.MaxPool2D(strides=2, padding='SAME'),
    layers.Dropout(0.3),

    layers.Flatten(),
    layers.Dense(1024, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(1, activation='sigmoid')
])

model.summary()

# 这里使用Adam优化器,对比了RMSProp,Adam收敛速度快一些
model.compile(
    optimizer=tf.train.AdamOptimizer(learning_rate),
    loss='binary_crossentropy',
    metrics=['acc'])

# 这里调用了callback,保存checkpoint,monitor和save_best_only保证了只在val_loss减小的情况下保存模型参数,period指定了保存的时机,每5个epoch保存一次
MODEL_DIR = './model/'
checkpoint_path = MODEL_DIR + "cp-{epoch:04d}.ckpt"
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, 
                                                monitor='val_loss',
                                                save_best_only=True,
                                                verbose=1, 
                                                save_weights_only=True, 
                                                period=5)

开始训练,测试,保存最终结果

# 中途停止可以使用下面这行重新加载参数,cp-0015视具体情况修改
# model.load_weights('./model/cp-0015.ckpt')

model.fit(
    image_input_fn(
        train_files, 
        train_labels,
        shuffle=True, 
        repeat_count=epochs,
        batch_size=batch_size), 
    validation_data=image_input_fn(
        eval_files,
        eval_labels,
        shuffle=False,
        repeat_count=epochs,
        batch_size=50), # batch_size * validation_steps应当等于验证集大小
    epochs=epochs,
    steps_per_epoch=steps_per_epoch,
    validation_steps=20,
    callbacks=[cp_callback])

result = model.predict(
    image_input_fn(
        test_files,
        test_labels, # 这个地方是个bug
        shuffle=False,
        batch_size=50), 
        steps=250) # batch_size * steps应当等于测试集大小,这里调整可以减小内存需要

# 根据kaggle的经验,限制结果在[0.005, 0.995]之间有助于增加分数
path = './submission1.csv'
counter = range(1, len(result) + 1)
result = np.array(result, np.float)
result = np.squeeze(result)

def limit(x):
    if x < 0.005:
        return 0.005
    elif x > 0.995:
        return 0.995
    else:
        return x

df = pd.DataFrame({'id': counter, 'label': result})
df['label'] = df['label'].map(limit)

file = df.to_csv(path_or_buf=None, index=None)
with tf.gfile.Open(path, 'w') as f:
    f.write(file)

print('Mission Accomplished!')

在train完整训练集上,我们使用自定义model训练40轮后得到的分数为

0.24594

在train完整训练集上,我们使用VGG16训练10轮后得到的分数为(还有进步空间,前10%大约0.04左右,差距很大)

0.21975

但是使用VGG16时有一个明显的问题,设置VGG16模型的参数不参与训练,但是每一次迭代都在计算这一层,也就是说我们浪费了很多时间重复计算。所以接下来我们考虑先使用VGG16或者其他模型对训练集图片生成新的特征向量,再对特征向量建立输出,这样就只需要计算一次VGG16层。

2.4 使用预训练模型fine-tuning完成猫狗识别

参考:

Kaggle猫狗大战准确率Top 2%webapp部署

代码根据参考做了修改和完善,补充了一些忽略掉的地方

首先,需要移动图片文件

原始图片路径:

dogvscat:
    train:
        cat.0.jpg
        cat.1.jpg
        ...
        dog.0.jpg
        ...
    test:
        1.jpg
        2.jpg
        ...

需要变成

dogvscat:
    img_train:
        cat:
            cat.0.jpg
            ...
        dog:
            dog.0.jpg
            ...
    img_test:
        test:
            1.jpg
            ...

test数据很简单,复制一下就行了,对于train数据来说,需要将dog和cat分别放在各自的文件夹中,这里很简单,用shutil.copyfile拷贝就行了

import os
import shutil
path = 'train'
for i in range(0, 12500):
    name = 'dog.{}.jpg'.format(i)
    src = os.path.join(path, name)
    dst = os.path.join(os.path.join('img_train', 'dog'), name)
    shutil.copyfile(src, dst)

for i in range(0, 12500):
    name = 'cat.{}.jpg'.format(i)
    src = os.path.join(path, name)
    dst = os.path.join(os.path.join('img_train', 'cat'), name)
    shutil.copyfile(src, dst)

然后使用ImageDataGenerator和与训练好的模型生成特征向量并保存为.h5文件

"""使用预训练好的模型inception_v3,resnet50,mobilenet_v2或者其他模型也可以,单个模型也可以"""
from tensorflow import keras
from keras.applications import inception_v3
from keras.applications import resnet50
from keras.applications import mobilenet_v2
from keras.preprocessing.image import ImageDataGenerator
import h5py
import re

def write_gap(MODEL, image_size, preprocess_input):
    width = image_size[0]
    height = image_size[1]
    input_tensor = keras.Input((height, width, 3))
    x = input_tensor
    # include_top为False表示不需要最后的全连接层做预测,对于最后的数据使用avg池化处理
    base_model = MODEL(input_tensor=x, weights='imagenet', include_top=False, pooling='avg')
    model = keras.Model(inputs=base_model.input, outputs=base_model.output)
    # ImageDataGenerator需要对数据进行预处理,不同的模型预处理方式不同,比如可能需要减去均值等等,我们这里直接传入模型自带的preprocess_input方法
    gen = ImageDataGenerator(preprocessing_function=preprocess_input)
    # flow_from_directory第一个参数表示图片路径,会自动寻找分类,这里指定classes=['cat', 'dog'],也就是cat标签为0,dog标签为1,class_mode='sparse'表示使用数字作为标签而不是onehot变量,batch_size视内存决定
    train_generator = gen.flow_from_directory("img_train", image_size, shuffle=False, classes=['cat', 'dog'], class_mode='sparse',
                                              batch_size=50)
    # 同理,test文件夹不需要标签,但是也必须放在test文件夹下,表示标签数量为1,flow_from_directory读取文件的方式是os.listdir(),文件名顺序与实际读取顺序不同,所以是个伏笔,需要记录这个信息
    test_generator = gen.flow_from_directory("img_test", image_size, shuffle=False,
                                             batch_size=50, class_mode=None)
    # 这里我处理了test数据的文件名,因为kaggle的结果是按照文件名顺序排序的,我们必须最后将预测结果排序后再上传
    filenames = test_generator.filenames
    index = [int(re.split('[/.]', name)[1]) for name in filenames]
    # compile是无效的,但是不调用无法执行predict_generator
    model.compile(
        loss='binary_crossentropy',
        optimizer='adam',
        metrics=['acc']
    )

    train = model.predict_generator(train_generator, verbose=1)
    test = model.predict_generator(test_generator, verbose=1)
    # 将结果写入h5文件中,注意保存了4种数据,这里MODEL.__name__是无效的,始终为wrapper,所以后面不能同时执行
    with h5py.File("gap_%s.h5"%MODEL.__name__) as h:
        h.create_dataset("train", data=train)
        h.create_dataset("test", data=test)
        h.create_dataset("label", data=train_generator.classes)
        h.create_dataset("index", data=index)
# 每次执行一个模型的特征映射需要手动修改文件名
write_gap(inception_v3.InceptionV3, (299, 299), inception_v3.preprocess_input)

# write_gap(resnet50.ResNet50, (224, 224), resnet50.preprocess_input)

# write_gap(mobilenet_v2.MobileNetV2, (224, 224), mobilenet_v2.preprocess_input)

可以使用以下代码测试.h5文件是否正确

import h5py
import numpy as np

x_train = []
y_train = []
x_test = []
x_index = []
with h5py.File('gap_MobileNetV2.h5', 'r') as h:
    x_train.append(np.array(h['train']))
    x_test.append(np.array(h['test']))
    y_train = np.array(h['label'])
    x_index = np.array(h['index'])

x_train = np.array(x_train)
x_test = np.array(x_test)

print(x_train.shape, x_test.shape, y_train.shape, x_index.shape)
print(x_train[0], y_train[0])
print(x_index[:10], x_test[:10])

这样我们得到了三个.h5文件,gap_MobileNetV2.h5, gap_InceptionV3.h5, gap_ResNet50.h5,然后读取数据,重新构建模型

#%%
import numpy as np
from tensorflow import keras
import tensorflow as tf
import h5py
import pandas as pd
import os
# 这里在mac上如果需要使用三个模型需要设置这个环境变量,不然报错
# OMP: Error #15: Initializing libomp.dylib, but found libiomp5.dylib already initialized.
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

x_train = []
y_train = []
x_test = []
x_index = []
for filename in ['gap_InceptionV3.h5', 'gap_ResNet50.h5', 'gap_MobileNetV2.h5']:
    with h5py.File(filename, 'r') as h:
        x_train.append(np.array(h['train']))
        x_test.append(np.array(h['test']))
        y_train = np.array(h['label'])
        x_index = np.array(h['index'])

x_train = np.concatenate(x_train, axis=1)
x_test = np.concatenate(x_test, axis=1)
y_train = np.array(y_train)
x_index = np.array(x_index)
print(x_train.shape, x_test.shape, y_train.shape, x_index.shape)

def unison_shuffled_copies(a, b):
    a = np.array(a)
    b = np.array(b)
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

x_train, y_train = unison_shuffled_copies(x_train, y_train)

# 模型很简单,将三个特征向量拼接起来,然后dropout,最后输出sigmoid
model = keras.Sequential([
    keras.layers.Dropout(0.5, input_shape=(x_train.shape[-1],)),
    keras.layers.Dense(1, activation='sigmoid')
])

model.compile(
    loss='binary_crossentropy',
    optimizer=tf.train.AdamOptimizer(),
    metrics=['acc']
)

model.fit(
    x=x_train,
    y=y_train,
    batch_size=128,
    epochs=8,
    validation_split=0.2,
    callbacks=[keras.callbacks.TensorBoard(log_dir='./log')]
)
model.save('model.h5')

#%%
result = model.predict(x_test)

path = './submission.csv'
counter = x_index
result = np.array(result, np.float)
result = np.squeeze(result)

def limit(x):
    if x < 0.005:
        return 0.005
    elif x > 0.995:
        return 0.995
    else:
        return x

df = pd.DataFrame({'id': counter, 'label': result})
df['label'] = df['label'].map(limit)
df = df.sort_values(by='id') # 这里对应上文提到的,根据文件名排序
file = df.to_csv(path_or_buf=None, index=None)
with tf.gfile.Open(path, 'w') as f:
    f.write(file)

print('Mission Accomplished!')

kaggle分数:

0.04071

2.5 自定义Estimator-CIFAR10识别

参考:

高级卷积神经网络 CIFAR-10 ResNet

首先分析一下Google官网的CIFAR-10源码

TnesorFlow-CIFAR10

然后开始修改代码适配自己的环境

generate_cifar10_tfrecords.py少量修改

# ==============================================================================
"""根据cifar10数据集生成TFRecords文件
版本:
    TensorFlow:1.12
    Python:3.6.7
使用keras.datasets.cifar10.load_data()获得数据,训练集中划分后20%作为验证集,
通过TFRecordWriter写入到三个文件中:train.tfrecords, validation.tfrecords, 
eval.tfrecords,运行时参数data_dir指定生成文件的路径。
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import os

import numpy as np
import tensorflow as tf
from tensorflow import keras

FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string(
    'data_dir', './cifar10', 'Directory to generate tfrecords to.')
FILE_NAMES = ['train', 'validation', 'eval']


def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def convert_to_tfrecord(x, y, output_file):
    """生成tfrecords"""
    with tf.io.TFRecordWriter(output_file) as writer:
        data_length = len(y)
        for i in range(data_length):
            example = tf.train.Example(features=tf.train.Features(
                feature={
                    # 通过keras获得的数据集的image是uint8类型的数据
                    'image': _bytes_feature(x[i].tobytes()),
                    # 通过keras获得的数据集的label是[xxx, 1]的形状,类型int32,
                    # 需要y[i, 0]获得标签数值,类型转换为int64,
                    # tfrecords只支持Int64List,没有Int32List
                    'label': _int64_feature(y[i, 0].astype(np.int64))
                }))
            writer.write(example.SerializeToString())
    print('Generate {} success!'.format(output_file))


def main(data_dir):
    """
    参数:
        data_dir:tfrecords文件保存路径
    功能:
        主函数,包括生成文件夹,获取数据,划分数据,生成tfrecords文件
    """
    if not os.path.exists(data_dir):
        os.mkdir(data_dir)
    print('Start to generate tfrecords in {}.'.format(data_dir))
    # 调用keras.datasets.cifar10.load_data()获得数据
    (x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
    # 这里划分前80%的数据做训练集,20%验证集,理论上要shuffle,
    # 这里我感觉不shuffle也行
    split_index = int(len(y_train) * 0.8)
    assert len(x_train) == len(y_train)
    val_data = x_train[split_index:], y_train[split_index:]
    train_data = x_train[:split_index], y_train[:split_index]
    eval_data = x_test, y_test
    for mode, data in zip(FILE_NAMES, [train_data, val_data, eval_data]):
        output_file = os.path.join(data_dir, mode + '.tfrecords')
        x, y = data
        try:
            os.remove(output_file)
        except OSError:
            pass
        convert_to_tfrecord(x, y, output_file)
    print('Done!')

if __name__ == '__main__':
    # 通过tensorflow的flags产生运行时参数,简单一些
    main(FLAGS.data_dir)

cifar10.py少量修改

# ==============================================================================
"""生成CIFAR10 Dataset
版本:
    TensorFlow:1.12
    Python:3.6.7
读取tfrecords文件,对图片和标签的数据类型进行调整,对图片进行扰乱处理,比如裁剪、
亮度调整、对比度调整和翻转等操作,shuffle和batch,make_batch返回一个batch的数据,
此部分代码改动较少。
"""
import os

import tensorflow as tf

HEIGHT = 32
WIDTH = 32
DEPTH = 3


class Cifar10DataSet(object):
    """通过一个类来管理dataset"""
    def __init__(self, data_dir, subset='train', use_distortion=True):
        self.data_dir = data_dir
        self.subset = subset
        self.use_distortion = use_distortion

    def get_filenames(self):
        if self.subset in ['train', 'validation', 'eval']:
            return [os.path.join(self.data_dir, self.subset + '.tfrecords')]
        else:
            raise ValueError('Invalid data subset {}'.format(self.subset))

    def parser(self, example):
        """读取tfrecords文件,类型转换,shape调整"""
        features = tf.parse_single_example(
            example, 
            features={
                'image': tf.FixedLenFeature([], tf.string),
                'label': tf.FixedLenFeature([], tf.int64)
            })
        image = tf.decode_raw(features['image'], tf.uint8)
        image.set_shape([DEPTH * HEIGHT * WIDTH])

        image = tf.cast(
            tf.transpose(tf.reshape(image, [DEPTH, HEIGHT, WIDTH]), [1, 2, 0]),
            tf.float32)
        label = tf.cast(features['label'], tf.int32)

        image = self.preprocess(image)
        return image, label

    def preprocess(self, image):
        """对train数据集进行扰乱,包括裁剪、亮度调整、对比度调整和翻转等操作"""
        if self.subset == 'train' and self.use_distortion:
            image = tf.image.resize_image_with_crop_or_pad(image, 40, 40)
            image = tf.random_crop(image, [HEIGHT, WIDTH, DEPTH]) # 裁剪
            image = tf.image.random_flip_left_right(image) # 左右翻转
            # image = tf.image.random_brightness(image, max_delta=10) # 亮度
            # image = tf.image.random_contrast(image, lower=0.2, upper=1.8) # 对比度
            # image = tf.image.random_hue(image, max_delta=0.1) # 色相
            # image = tf.image.random_flip_up_down(image) # 上下翻转
            # image = tf.image.random_saturation(image, 0, 5) # 饱和度
            # image = tf.image.random_jpeg_quality(image, 50, 90) # 噪声,jpeg质量
        return image

    def make_batch(self, batch_size):
        """通过TFRecordDataset读取文件,shuffle和batch数据集,返回一个batch的数据"""
        filenames = self.get_filenames()
        dataset = tf.data.TFRecordDataset(filenames).repeat()
        # num_parallel_calls并行处理,加速IO
        dataset = dataset.map(
            self.parser, num_parallel_calls=batch_size)
        # 缓冲池的大小设计
        if self.subset == 'train':
            min_queue_examples = int(
                Cifar10DataSet.num_examples_per_epoch(self.subset) * 0.4)
            dataset = dataset.shuffle(buffer_size=min_queue_examples + 3 * batch_size)

        dataset = dataset.batch(batch_size)
        iterator = dataset.make_one_shot_iterator()
        image_batch, label_batch = iterator.get_next()

        return image_batch, label_batch

    @staticmethod
    def num_examples_per_epoch(subset='train'):
        if subset == 'train':
            return 40000 # 对源码进行了修改
        elif subset == 'validation':
            return 10000
        elif subset == 'eval':
            return 10000
        else:
            raise ValueError('Invalid data subset "%s"' % subset)

通过下面的代码测试Cifar10DataSet是否正确

import tensorflow as tf
import cifar10
sess = tf.Session()

dataset = cifar10.Cifar10DataSet('./cifar10')
data = dataset.make_batch(16)
img, label = sess.run(data)

print(img.shape, label)

这里我修改成了仅使用CPU训练的模式

model_base.pycifar10_model.py没有修改,cifar10_main_cpu.py大量修改

# ==============================================================================
"""使用CPU进行训练的main文件
版本:
    TensorFlow:1.12
    Python:3.6.7
定义运行时参数,仅使用CPU进行训练和验证
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os

import cifar10
import cifar10_model
import numpy as np
import tensorflow as tf

tf.logging.set_verbosity(tf.logging.INFO)
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string(
    'data_dir', './cifar10', 'Directory to generate tfrecords to.')
tf.app.flags.DEFINE_string(
    'job_dir', './tmp', 'Directory to generate model to.')
tf.app.flags.DEFINE_integer(
    'train_steps', 1000, 'Train steps.')
tf.app.flags.DEFINE_integer(
    'eval_steps', 100, 'Eval steps.')  # eval_steps * eval_batch_size最好等于eval数据集大小
tf.app.flags.DEFINE_integer(
    'train_batch_size', 128, 'Train batch size.')
tf.app.flags.DEFINE_integer(
    'eval_batch_size', 100, 'Eval batch size.')    
tf.app.flags.DEFINE_integer(
    'num_layers', 44, 'The number of layers of the model.') 
tf.app.flags.DEFINE_float(
    'learning_rate', 0.1, 'Learning rate value.') 
tf.app.flags.DEFINE_integer(
    'decay_steps', 1000, 'The number of learning rate decay steps.')   
tf.app.flags.DEFINE_float(
    'decay_rate', 0.9, 'Decay rate value.')   
tf.app.flags.DEFINE_boolean(
    'use_distortion_for_training', True, 'If doing image distortion for training.') 
tf.app.flags.DEFINE_float(
    'batch_norm_decay', 0.997, 'Decay for batch norm.')   
tf.app.flags.DEFINE_float(
    'batch_norm_epsilon', 1e-5, 'Epsilon for batch norm.')   
tf.app.flags.DEFINE_integer(
    'num_inter_threads', 6, 'Number of threads to use for inter-op parallelism.')
tf.app.flags.DEFINE_integer(
    'num_intra_threads', 6, 'Number of threads to use for intra-op parallelism.')


def get_model_fn():
    """返回Estimator的model_fn"""

    def _resnet_model_fn(features, labels, mode, params):
        """
        返回包含Resnet模型的EstimatorSpec,只有train和evaluate方法,
        没有predict方法,优化器使用Adam,learning rate会自动衰减
        
        Args:
            features:一个batch的image数据
            labels:一个batch的label数据
            mode:调用train还是evaluate
            params:其他运行参数
        Returns:
            tf.estimator.EstimatorSpec
        """
        is_training = (mode == tf.estimator.ModeKeys.TRAIN)
        decay_steps = params['decay_steps'] # 学习率衰减的steps
        decay_rate = params['decay_rate'] # 学习率衰减率
        learning_rate = params['learning_rate']

        loss, preds = _calc_fn(
            is_training, features, labels,
            params['num_layers'], params['batch_norm_decay'],
            params['batch_norm_epsilon'])
        # batch_norm需要更新
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        
        # 使用tf.train.exponential_decay实现学习率衰减,
        # 以默认情况,80000steps后学习率衰减为0.0002,与原始代码近似
        learning_rate = tf.train.exponential_decay(
            learning_rate=learning_rate,
            global_step=tf.train.get_global_step(),
            decay_steps=decay_steps,
            decay_rate=decay_rate
        )

        # tensor_to_log是dict类型,且key为tensor的name
        avg_loss = tf.reduce_mean(loss)
        avg_loss = tf.identity(avg_loss, name='loss')
        tensor_to_log = {'learning_rate': learning_rate, 'loss': avg_loss}
        logging_hook = tf.train.LoggingTensorHook(
            tensors=tensor_to_log, every_n_iter=100)
        
        counter_hook = tf.train.StepCounterHook(every_n_steps=10)

        train_hooks = [logging_hook, counter_hook]

        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)

        train_op = [
            optimizer.minimize(
                loss, global_step=tf.train.get_global_step())
        ]
        train_op.extend(update_ops)
        train_op = tf.group(*train_op)

        predictions = {
            'classes': preds['classes'],
            'probabilities': preds['probabilities']
        }
        metrics = {
            'accuracy':
                tf.metrics.accuracy(labels, predictions['classes'])
        }
        return tf.estimator.EstimatorSpec(
            mode=mode,
            predictions=predictions,
            loss=loss,
            train_op=train_op,
            training_hooks=train_hooks,
            eval_metric_ops=metrics)

    return _resnet_model_fn


def _calc_fn(is_training, feature, label, 
            num_layers, batch_norm_decay, batch_norm_epsilon):
    """
    获取model,简单计算
    Args:
        is_training:判断是train还是evaluate
        feature:一个batch的image数据
        label:一个batch的label数据
        num_layers:Resnet层数
        batch_norm_decay:Resnet参数
        batch_norm_epsilon:Resnet参数
    Returns:
        loss:一个batch的softmax_cross_entropy
        pred:字典类型包括一个batch的标签和概率
    """
    model = cifar10_model.ResNetCifar10(
        num_layers,
        batch_norm_decay=batch_norm_decay,
        batch_norm_epsilon=batch_norm_epsilon,
        is_training=is_training,
        data_format='channels_last')
    logits = model.forward_pass(feature, input_data_format='channels_last')
    pred = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits)
    }

    loss = tf.losses.sparse_softmax_cross_entropy(
        logits=logits, labels=label)
    return loss, pred


def input_fn(data_dir,
            subset,
            batch_size,
            use_distortion_for_training=True):
    """
    输入函数,可以用于train数据集合eval数据集
    Args:
        data_dir:tfrecords文件所在的文件夹
        subset:判断是train还是evaluate
        batch_size:一个batch的大小
        use_distortion_for_training:是否对数据进行扰动
    Returns:
        image_batch:一个batch的image数据
        label_batch:一个batch的label数据
    """
    use_distortion = subset == 'train' and use_distortion_for_training
    dataset = cifar10.Cifar10DataSet(data_dir, subset, use_distortion)
    image_batch, label_batch = dataset.make_batch(batch_size)
    return image_batch, label_batch


def main(flags):
    # 为了调用多线程运行,需要使用tf.ConfigProto,
    # device_count指定最多使用多少devices,比如CPU,最多仅支持1;
    # 如果有多个GPU,可以指定最多使用其中的多少个,键值对形式
    # intra_op_parallelism_threads 控制运算符op内部的并行
    # inter_op_parallelism_threads 控制多个运算符op之间的并行计算
    # 线程数最好根据CPU的核心数来决定
    run_config = tf.ConfigProto(
        device_count={"CPU": 1},
        intra_op_parallelism_threads=flags.num_intra_threads,
        inter_op_parallelism_threads=flags.num_inter_threads)
    # tf.ConfigProto不能直接添加到Estimator中,
    # 需要使用tf.estimator.RunConfig包裹一下,顺便指定模型存储路径model_dir
    config = tf.estimator.RunConfig(
        model_dir=flags.job_dir,
        session_config=run_config)
    # tf.estimator.Estimator的params必须是dict类型
    classifier = tf.estimator.Estimator(
        model_fn=get_model_fn(),
        config=config,
        params={
            'decay_steps': flags.decay_steps,
            'decay_rate': flags.decay_rate,
            'num_layers': flags.num_layers,
            'batch_norm_decay': flags.batch_norm_decay,
            'batch_norm_epsilon': flags.batch_norm_epsilon,
            'train_batch_size': flags.train_batch_size,
            'learning_rate': flags.learning_rate
        })
    # 循环多次以观察eval的变化,防止过拟合
    for _ in range(50):
        classifier.train(input_fn=lambda: input_fn(
            flags.data_dir, 'train', flags.train_batch_size), 
            steps=flags.train_steps)
        classifier.evaluate(input_fn=lambda: input_fn(
            flags.data_dir, 'eval', flags.eval_batch_size),
            steps=flags.eval_steps)


if __name__ == '__main__':
    if not os.path.exists(FLAGS.data_dir):
        os.mkdir(FLAGS.data_dir)
    if not os.path.exists(FLAGS.job_dir):
        os.mkdir(FLAGS.job_dir)        
    main(FLAGS)    

首先通过generate_cifar10_tfrecords.py生成tfrecords文件,然后运行cifar10_main_cpu.py,同时可以自行指定各种参数,也可以使用默认值。


修改代码以适配多GPU环境,cifar10_main_gpu.py在源码的基础上小作修改

# ==============================================================================
"""使用GPU进行训练的main文件,包括分布式,实际功能未测试
版本:
    TensorFlow:1.12
    Python:3.6.7
定义运行时参数,仅使用GPU进行训练和验证
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os
import itertools

import cifar10
import cifar10_model
import numpy as np
import tensorflow as tf

tf.logging.set_verbosity(tf.logging.INFO)
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string(
    'data_dir', './cifar10', 'Directory to generate tfrecords to.')
tf.app.flags.DEFINE_string(
    'job_dir', './tmp1', 'Directory to generate model to.')
tf.app.flags.DEFINE_string(
    'variable_strategy', 'CPU', 'Where to locate variable operations')
tf.app.flags.DEFINE_integer(
    'train_steps', 20000, 'Train steps.')
tf.app.flags.DEFINE_integer(
    'num_gpus', 0, 'The number of gpus used. Uses only CPU if set to 0.')
tf.app.flags.DEFINE_integer(
    'eval_steps', 100, 'Eval steps.')  # eval_steps * eval_batch_size最好等于eval数据集大小
tf.app.flags.DEFINE_integer(
    'train_batch_size', 128, 'Train batch size.')
tf.app.flags.DEFINE_integer(
    'eval_batch_size', 100, 'Eval batch size.')    
tf.app.flags.DEFINE_integer(
    'num_layers', 44, 'The number of layers of the model.') 
tf.app.flags.DEFINE_float(
    'learning_rate', 0.1, 'Learning rate value.') 
tf.app.flags.DEFINE_float(
    'weight_decay', 2e-4, 'Weight decay for convolutions.') 
tf.app.flags.DEFINE_integer(
    'decay_steps', 2000, 'The number of learning rate decay steps.')   
tf.app.flags.DEFINE_float(
    'decay_rate', 0.96, 'Decay rate value.')   
tf.app.flags.DEFINE_string(
    'data_format', None, """If not set, the data format best for the training device is used. 
    Allowed values: channels_first (NCHW) channels_last (NHWC).""")
tf.app.flags.DEFINE_boolean(
    'log_device_placement', False, 'Whether to log device placement.') 
tf.app.flags.DEFINE_boolean(
    'sync', False, 'If present when running in a distributed environment will run on sync mode.') 
tf.app.flags.DEFINE_boolean(
    'use_distortion_for_training', True, 'If doing image distortion for training.') 
tf.app.flags.DEFINE_float(
    'batch_norm_decay', 0.997, 'Decay for batch norm.')   
tf.app.flags.DEFINE_float(
    'batch_norm_epsilon', 1e-5, 'Epsilon for batch norm.')   
tf.app.flags.DEFINE_integer(
    'num_inter_threads', 6, 'Number of threads to use for inter-op parallelism.')
tf.app.flags.DEFINE_integer(
    'num_intra_threads', 6, 'Number of threads to use for intra-op parallelism.')


def get_model_fn(num_gpus, variable_strategy, num_workers):
    """返回Estimator的model_fn"""

    def _resnet_model_fn(features, labels, mode, params):
        """
        返回包含Resnet模型的EstimatorSpec,只有train和evaluate方法,
        没有predict方法,优化器使用Adam,learning rate会自动衰减
        
        Args:
            features:一个batch的image数据
            labels:一个batch的label数据
            mode:调用train还是evaluate
            params:其他运行参数
        Returns:
            tf.estimator.EstimatorSpec
        """
        is_training = (mode == tf.estimator.ModeKeys.TRAIN)
        decay_steps = params['decay_steps'] # 学习率衰减的steps
        decay_rate = params['decay_rate'] # 学习率衰减率
        learning_rate = params['learning_rate']
        weight_decay = params['weight_decay']
        # 多GPU需要分别计算不同设备的loss和梯度,再综合起来
        tower_features = features
        tower_labels = labels
        tower_losses = []
        tower_gradvars = []
        tower_preds = []

        data_format = params['data_format']
        if not data_format:
            if num_gpus == 0:
                data_format = 'channels_last'
            else:
                data_format = 'channels_first'

        if num_gpus == 0:
            num_devices = 1
            device_type = 'cpu'
        else:
            num_devices = num_gpus
            device_type = 'gpu'
        # Todo GPU部分代码没有测试,不知道是不是对的
        for i in range(num_devices):
            worker_device = '/{}:{}'.format(device_type, i)
            if variable_strategy == 'CPU':
                device_setter = tf.train.replica_device_setter(
                    worker_device=worker_device)
            elif variable_strategy == 'GPU':
                device_setter = tf.train.replica_device_setter(
                    worker_device=worker_device,
                    ps_strategy=tf.contrib.training.GreedyLoadBalancingStrategy(
                        num_gpus, tf.contrib.training.byte_size_load_fn))
            with tf.variable_scope('resnet', reuse=bool(i != 0)):
                with tf.name_scope('tower_%d' % i) as name_scope:
                    with tf.device(device_setter):
                        loss, gradvars, preds = _calc_fn(
                            is_training, weight_decay, tower_features[i], 
                            tower_labels[i], data_format, params['num_layers'], 
                            params['batch_norm_decay'], params['batch_norm_epsilon'])
                        tower_losses.append(loss)
                        tower_gradvars.append(gradvars)
                        tower_preds.append(preds)
                        if i == 0:
                            # batch_norm需要更新
                            update_ops = tf.get_collection(
                                tf.GraphKeys.UPDATE_OPS, name_scope)
        
        gradvars = []
        with tf.name_scope('gradient_averaging'):
            all_grads = {}
            for grad, var in itertools.chain(*tower_gradvars):
                if grad is not None:
                    all_grads.setdefault(var, []).append(grad)
            for var, grads in all_grads.items():
                with tf.device(var.device):
                    if len(grads) == 1:
                        avg_grad = grads[0]
                    else:
                        avg_grad = tf.multiply(tf.add_n(grads), 1. / len(grads))
                gradvars.append((avg_grad, var))

        consolidation_device = '/gpu:0' if variable_strategy == 'GPU' else '/cpu:0'
        with tf.device(consolidation_device):
            # 使用tf.train.exponential_decay实现学习率衰减
            learning_rate = tf.train.exponential_decay(
                learning_rate=learning_rate,
                global_step=tf.train.get_global_step(),
                decay_steps=decay_steps,
                decay_rate=decay_rate
            )
            loss = tf.reduce_mean(tower_losses, name='loss')

            # tensor_to_log是dict类型,且key为tensor的name
            tensor_to_log = {'learning_rate': learning_rate, 'loss': loss}
            logging_hook = tf.train.LoggingTensorHook(
                tensors=tensor_to_log, every_n_iter=100)
        
            counter_hook = tf.train.StepCounterHook(every_n_steps=20)

            train_hooks = [logging_hook, counter_hook]

            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
            # Todo,分布式代码没有测试
            if params['sync']:
                optimizer = tf.train.SyncReplicasOptimizer(
                    optimizer, replicas_to_aggregate=num_workers)
                sync_replicas_hook = optimizer.make_session_run_hook(params['is_chief'])
                train_hooks.append(sync_replicas_hook)
            train_op = [
                optimizer.apply_gradients(
                    gradvars, global_step=tf.train.get_global_step())
            ]
            train_op.extend(update_ops)
            train_op = tf.group(*train_op)

            predictions = {
                'classes': 
                    tf.concat([p['classes'] for p in tower_preds], axis=0),
                'probabilities': 
                    tf.concat([p['probabilities'] for p in tower_preds], axis=0)
            }
            stacked_labels = tf.concat(labels, axis=0)

            metrics = {
                'accuracy':
                    tf.metrics.accuracy(stacked_labels, predictions['classes'])
            }
        return tf.estimator.EstimatorSpec(
            mode=mode,
            predictions=predictions,
            loss=loss,
            train_op=train_op,
            training_hooks=train_hooks,
            eval_metric_ops=metrics)

    return _resnet_model_fn


def _calc_fn(is_training, weight_decay, feature, label, data_format,
            num_layers, batch_norm_decay, batch_norm_epsilon):
    """
    获取model,简单计算
    Args:
        is_training:判断是train还是evaluate
        weight_decay:l2损失系数
        feature:一个batch的image数据
        label:一个batch的label数据
        data_format:channels_last (NHWC) or channels_first (NCHW)
        num_layers:Resnet层数
        batch_norm_decay:Resnet参数
        batch_norm_epsilon:Resnet参数
    Returns:
        loss:一个batch的softmax_cross_entropy
        pred:字典类型包括一个batch的标签和概率
    """
    model = cifar10_model.ResNetCifar10(
        num_layers,
        batch_norm_decay=batch_norm_decay,
        batch_norm_epsilon=batch_norm_epsilon,
        is_training=is_training,
        data_format=data_format)
    logits = model.forward_pass(feature, input_data_format='channels_last')
    pred = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits)
    }

    loss = tf.losses.sparse_softmax_cross_entropy(
        logits=logits, labels=label)
    loss = tf.reduce_mean(loss)

    model_params = tf.trainable_variables()
    loss += weight_decay * tf.add_n([tf.nn.l2_loss(v) for v in model_params])
    grad = tf.gradients(loss, model_params)
    return loss, zip(grad, model_params), pred


def input_fn(data_dir,
            subset,
            num_shards,
            batch_size,
            use_distortion_for_training=True):
    """
    输入函数,可以用于train数据集合eval数据集
    Args:
        data_dir:tfrecords文件所在的文件夹
        subset:判断是train还是evaluate
        batch_size:一个batch的大小
        use_distortion_for_training:是否对数据进行扰动
    Returns:
        image_batch:一个batch的image数据
        label_batch:一个batch的label数据
    """
    with tf.device('/cpu:0'):
        use_distortion = subset == 'train' and use_distortion_for_training
        dataset = cifar10.Cifar10DataSet(data_dir, subset, use_distortion)
        image_batch, label_batch = dataset.make_batch(batch_size)
        if num_shards <= 1:
            return [image_batch], [label_batch] # 必须返回list,对应_calc_fn的参数
        # 均分训练数据给不同的设备
        image_batch = tf.unstack(image_batch, num=batch_size, axis=0)
        label_batch = tf.unstack(label_batch, num=batch_size, axis=0)
        feature_shards = [[] for i in range(num_shards)]
        label_shards = [[] for i in range(num_shards)]
        for i in range(batch_size):
            idx = i % num_shards
            feature_shards[idx].append(image_batch[i])
            label_shards[idx].append(label_batch[i])
        feature_shards = [tf.parallel_stack(x) for x in feature_shards]
        label_shards = [tf.parallel_stack(x) for x in label_shards]
        return feature_shards, label_shards

def main(flags):
    # The env variable is on deprecation path, default is set to off.
    os.environ['TF_SYNC_ON_FINISH'] = '0'
    os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
    # 为了调用多线程运行,需要使用tf.ConfigProto,
    # device_count指定最多使用多少devices,比如CPU,最多仅支持1;
    # 如果有多个GPU,可以指定最多使用其中的多少个,键值对形式
    # intra_op_parallelism_threads 控制运算符op内部的并行
    # inter_op_parallelism_threads 控制多个运算符op之间的并行计算
    run_config = tf.ConfigProto(
        device_count={"CPU": 1, "GPU": 0},
        allow_soft_placement=True, # GPU显存相关,自动增加
        log_device_placement=flags.log_device_placement,
        gpu_options=tf.GPUOptions(force_gpu_compatible=True),
        intra_op_parallelism_threads=flags.num_intra_threads,
        inter_op_parallelism_threads=flags.num_inter_threads)
    # tf.ConfigProto不能直接添加到Estimator中,
    # 需要使用tf.estimator.RunConfig包裹一下,顺便指定模型存储路径model_dir
    config = tf.estimator.RunConfig(
        model_dir=flags.job_dir,
        session_config=run_config)
    # tf.estimator.Estimator的params必须是dict类型
    classifier = tf.estimator.Estimator(
        model_fn=get_model_fn(
            flags.num_gpus, 
            flags.variable_strategy, 
            config.num_worker_replicas or 1),
        config=config,
        params={
            'decay_steps': flags.decay_steps,
            'decay_rate': flags.decay_rate,
            'num_layers': flags.num_layers,
            'weight_decay': flags.weight_decay,
            'batch_norm_decay': flags.batch_norm_decay,
            'batch_norm_epsilon': flags.batch_norm_epsilon,
            'train_batch_size': flags.train_batch_size,
            'learning_rate': flags.learning_rate,
            'data_format': flags.data_format,
            'sync': flags.sync,
            'is_chief':config.is_chief
        })
    # 循环多次以观察eval的变化,防止过拟合
    for _ in range(3):
        classifier.train(input_fn=lambda: input_fn(
            flags.data_dir, 'train', flags.num_gpus, flags.train_batch_size), 
            steps=flags.train_steps)
        classifier.evaluate(input_fn=lambda: input_fn(
            flags.data_dir, 'eval', flags.num_gpus, flags.eval_batch_size),
            steps=flags.eval_steps)


if __name__ == '__main__':
    if not os.path.exists(FLAGS.data_dir):
        os.mkdir(FLAGS.data_dir)
    if not os.path.exists(FLAGS.job_dir):
        os.mkdir(FLAGS.job_dir)      
    # 下面是对参数的一些约束,比如使用GPU数量与ResNet网络层数的逻辑约束等
    if FLAGS.num_gpus > 0:
        assert tf.test.is_gpu_available(), 'Requested GPUs but none found.'
    if FLAGS.num_gpus < 0:
        raise ValueError(
        'Invalid GPU count: \"--num-gpus\" must be 0 or a positive integer.')
    if FLAGS.num_gpus == 0 and FLAGS.variable_strategy == 'GPU':
        raise ValueError('num-gpus=0, CPU must be used as parameter server. Set'
                     '--variable-strategy=CPU.')
    if (FLAGS.num_layers - 2) % 6 != 0:
        raise ValueError('Invalid --num-layers parameter.')
    if FLAGS.num_gpus != 0 and FLAGS.train_batch_size % FLAGS.num_gpus != 0:
        raise ValueError('--train-batch-size must be multiple of --num-gpus.')
    if FLAGS.num_gpus != 0 and FLAGS.eval_batch_size % FLAGS.num_gpus != 0:
        raise ValueError('--eval-batch-size must be multiple of --num-gpus.')
    if cifar10.Cifar10DataSet.num_examples_per_epoch('eval') % FLAGS.eval_batch_size != 0:
        raise ValueError('validation set size must be multiple of eval_batch_size')

    tf.app.run(main(FLAGS))