import urllib.request
import os
import tarfile
url = "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
filepath = "D:/AI/data/CIFAR-10/cifar-10-python.tar.gz"
if not os.path.isfile(filepath):
result = urllib.request.urlretrieve(url, filepath)
print("Download: ", result)
else:
print("Data file already exists!")
if not os.path.exists("D:/AI/data/CIFAR-10/cifar-10-batches-py"):
tfile = tarfile.open("D:/AI/data/CIFAR-10/cifar-10-python.tar.gz", "r:gz")
result = tfile.extractall("D:/AI/data/CIFAR-10/")
print("Extracted to D:/AI/data/CIFAR-10/cifar-10-batches-py/")
else:
print("Directory already exists!")
Data file already exists! Directory already exists!
import numpy as np
import pickle as p
def load_CIFAR_batch(filename):
with open(filename, "rb") as f:
data_dict = p.load(f, encoding = "bytes")
images = data_dict[b"data"]
labels = data_dict[b"labels"]
images = images.reshape(10000, 3, 32, 32)
images = images.transpose(0, 2, 3, 1)
labels = np.array(labels)
return images, labels
def load_CIFAR_data(data_dir):
images_train = []
labels_train = []
for i in range(5):
f = os.path.join(data_dir, "data_batch_%d" % (i + 1))
print("Loading", f)
image_batch, label_batch = load_CIFAR_batch(f)
images_train.append(image_batch)
labels_train.append(label_batch)
Xtrain = np.concatenate(images_train)
Ytrain = np.concatenate(labels_train)
del image_batch, label_batch
Xtest, Ytest = load_CIFAR_batch(os.path.join(data_dir, "test_batch"))
print("Finished loadding CIFAR-10 data")
return Xtrain, Ytrain, Xtest, Ytest
data_dir = "D:/AI/data/CIFAR-10/cifar-10-batches-py/"
Xtrain, Ytrain, Xtest, Ytest = load_CIFAR_data(data_dir)
Loading D:/AI/data/CIFAR-10/cifar-10-batches-py/data_batch_1 Loading D:/AI/data/CIFAR-10/cifar-10-batches-py/data_batch_2 Loading D:/AI/data/CIFAR-10/cifar-10-batches-py/data_batch_3 Loading D:/AI/data/CIFAR-10/cifar-10-batches-py/data_batch_4 Loading D:/AI/data/CIFAR-10/cifar-10-batches-py/data_batch_5 Finished loadding CIFAR-10 data
%matplotlib inline
import matplotlib.pyplot as plt
plt.imshow(Xtrain[6])
print(Ytrain[6])
import matplotlib.pyplot as plt
label_dict = {0: "airplane", 1: "automobile", 2: "bird", 3: "cat", 4: "deer", 5: "dog", 6: "frog", 7: "horse", 8: "ship", 9: "truck"}
def plot_images_labels_prediction(images, labels, prediction, idx, num = 10):
fig = plt.gcf()
fig.set_size_inches(12, 6)
if num > 10:
num = 10
for i in range(0, num):
ax = plt.subplot(2, 5, i + 1)
ax.imshow(images[idx], cmap = "binary")
title = str(i) + ", " + label_dict[labels[idx]]
if len(prediction) > 0:
title += " => " + label_dict[prediction[idx]]
ax.set_title(title, fontsize = 10)
idx += 1
plt.show()
plot_images_labels_prediction(Xtest, Ytest, [], 20, 10)
Xtrain_normalize = Xtrain.astype("float32") / 255.0
Xtest_normalize = Xtest.astype("float32") / 255.0
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(sparse = False)
yy = [[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]]
encoder.fit(yy)
Ytrain_reshape = Ytrain.reshape(-1, 1)
Ytrain_onehot = encoder.transform(Ytrain_reshape)
Ytest_reshape = Ytest.reshape(-1, 1)
Ytest_onehot = encoder.transform(Ytest_reshape)
Ytrain_reshape[0: 10]
array([[6], [9], [9], [4], [1], [1], [2], [7], [8], [3]])
Ytrain_onehot[0: 10]
array([[ 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.], [ 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.], [ 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.], [ 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.], [ 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.], [ 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.], [ 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.], [ 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.], [ 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.], [ 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.]])
def weight(shape):
return tf.Variable(tf.truncated_normal(shape, stddev = 0.1), name = "W")
def bias(shape):
return tf.Variable(tf.constant(0.1, shape = shape), name = "b")
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides = [1, 1, 1, 1], padding = "SAME")
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")
import tensorflow as tf
with tf.name_scope("input_layer"):
x = tf.placeholder("float32", shape = [None, 32, 32, 3], name = "x")
with tf.name_scope("conv_1"):
W1 = weight([3, 3, 3, 32])
b1 = bias([32])
conv_1 = conv2d(x, W1) + b1
conv_1 = tf.nn.relu(conv_1)
with tf.name_scope("pool_1"):
pool_1 = max_pool_2x2(conv_1)
with tf.name_scope("conv_2"):
W2 = weight([3, 3, 32, 64])
b2 = bias([64])
conv_2 = conv2d(pool_1, W2) + b2
conv_2 = tf.nn.relu(conv_2)
with tf.name_scope("pool_2"):
pool_2 = max_pool_2x2(conv_2)
with tf.name_scope("fcn"):
W3 = weight([4096, 128])
b3 = bias([128])
flat = tf.reshape(pool_2, [-1, 4096])
h = tf.nn.relu(tf.matmul(flat, W3) + b3)
h_dropout = tf.nn.dropout(h, keep_prob = 0.8)
with tf.name_scope("output_layer"):
W4 = weight([128, 10])
b4 = bias([10])
pred = tf.nn.softmax(tf.matmul(h_dropout, W4) + b4)
with tf.name_scope("optimizer"):
y = tf.placeholder("float32", shape = [None, 10], name = "label")
loss_function = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = pred, labels = y))
optimizer = tf.train.AdamOptimizer(learning_rate = 0.0001).minimize(loss_function)
with tf.name_scope("evaluation"):
correct_prediction = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float32"))
import time
train_epochs = 1
batch_size = 50
total_batch = int(len(Xtrain) / batch_size)
epoch_list = []
accuracy_list = []
loss_list = []
epoch = tf.Variable(0, name = "epoch", trainable = False)
startTime = time.time()
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
ckpt_dir = "D:/AI/log/CIFAR10_log"
if not os.path.exists(ckpt_dir):
os.makedirs(ckpt_dir)
saver = tf.train.Saver(max_to_keep = 1)
ckpt = tf.train.latest_checkpoint(ckpt_dir)
if ckpt != None:
saver.restore(sess, ckpt)
else:
print("Training from scratch.")
start = sess.run(epoch)
print("Training starts from {} epoch".format(start + 1))
def get_train_batch(number, batch_size):
return Xtrain_normalize[number * batch_size: (number + 1) * batch_size], Ytrain_onehot[number * batch_size: (number + 1) * batch_size]
for ep in range(start, train_epochs):
for i in range(total_batch):
batch_x, batch_y = get_train_batch(i, batch_size)
sess.run(optimizer, feed_dict = {x: batch_x, y: batch_y})
if i % 100 == 0:
print("Step {}".format(i), "Finished!")
loss, acc = sess.run([loss_function, accuracy], feed_dict = {x: batch_x, y: batch_y})
epoch_list.append(ep + 1)
loss_list.append(loss)
accuracy_list.append(acc)
print("Train epoch: ", (ep + 1), "Loss = ", loss, "Accuracy = ", acc)
saver.save(sess, ckpt_dir + "CIFAR10_cnn_model.ckpt", global_step = ep + 1)
sess.run(epoch.assign(ep + 1))
duration = time.time() - startTime
print("Train finished takes ", duration)
Training from scratch. Training starts from 1 epoch Step 0 Finished! Step 100 Finished! Step 200 Finished! Step 300 Finished! Step 400 Finished! Step 500 Finished! Step 600 Finished! Step 700 Finished! Step 800 Finished! Step 900 Finished! Train epoch: 1 Loss = 2.21336 Accuracy = 0.2 Train finished takes 47.71810054779053
%matplotlib inline
fig = plt.gcf()
fig.set_size_inches(4, 2)
plt.plot(epoch_list, loss_list, label = "loss")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.legend(["loss"], loc = "upper right")
<matplotlib.legend.Legend at 0x2141ea0ba20>
%matplotlib inline
fig = plt.gcf()
fig.set_size_inches(4, 2)
plt.plot(epoch_list, accuracy_list, label = "accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.legend(["accuracy"], loc = "upper right")
<matplotlib.legend.Legend at 0x21426202438>
test_pred, acc = sess.run([pred, accuracy], feed_dict = {x: Xtest_normalize, y: Ytest_onehot})
print("在测试集上的精度为", acc)
在测试集上的精度为 0.6266
prediction_result = sess.run(tf.argmax(test_pred, 1))
print(prediction_result[0: 20])
label_result = sess.run(tf.argmax(Ytest_onehot, 1))
print(label_result[0: 20])
[3 1 1 0 4 6 1 6 3 1 0 9 3 7 9 6 5 7 8 6] [3 8 8 0 6 6 1 6 3 1 0 9 5 7 9 8 5 7 8 6]
plot_images_labels_prediction(Xtest, Ytest, prediction_result, 20, 10)
|