参考: https://github.com/AFAgarap/autoencoders/blob/master/notebooks/0-autoencoder.ipynb
实现1
"""Implementation of vanila autoencoder in TensorFlow 2.0 Subclassing API"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__version__ = "1.0.0"
__author__ = "Abien Fred Agarap"
import tensorflow as tf
class Encoder(tf.keras.layers.Layer):
def __init__(self, intermediate_dim=128, code_dim=64):
super(Encoder, self).__init__()
self.hidden_layer = tf.keras.layers.Dense(
units=intermediate_dim, activation=tf.nn.relu
)
self.output_layer = tf.keras.layers.Dense(
units=code_dim, activation=tf.nn.sigmoid
)
def call(self, input_features):
activation = self.hidden_layer(input_features)
return self.output_layer(activation)
class Decoder(tf.keras.layers.Layer):
def __init__(self, original_dim, code_dim=64):
super(Decoder, self).__init__()
self.hidden_layer = tf.keras.layers.Dense(units=code_dim, activation=tf.nn.relu)
self.output_layer = tf.keras.layers.Dense(
units=original_dim, activation=tf.nn.sigmoid
)
def call(self, code):
activation = self.hidden_layer(code)
return self.output_layer(activation)
class Autoencoder(tf.keras.Model):
def __init__(self, code_dim=64, intermediate_dim=128, original_dim=784):
super(Autoencoder, self).__init__()
self.loss = []
self.encoder = Encoder(code_dim=code_dim, intermediate_dim=intermediate_dim)
self.decoder = Decoder(code_dim=code_dim, original_dim=original_dim)
def call(self, features):
code = self.encoder(features)
reconstructed = self.decoder(code)
return reconstructed
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
__author__ = "Richard Ricardo, Abien Fred Agarap"
__version__ = "1.0.0"
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import tensorflow_datasets as tfds
tf.compat.v1.enable_eager_execution()
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
BATCH_SIZE = 64
EPOCHS = 10
train_dataset = tfds.load("mnist", split=tfds.Split.TRAIN)
def normalize(example):
features = example["image"]
features = tf.reshape(features, [-1, 784])
features = tf.cast(features, tf.float32)
features = features / 255.
return features, features
train_dataset = train_dataset.map(normalize)
train_dataset = train_dataset.shuffle(1024)
train_dataset = train_dataset.batch(BATCH_SIZE, True)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
model = Autoencoder()
for batch_features, _ in train_dataset.take(1):
model(batch_features)
break
model.summary()
结果如下
model.compile(loss=tf.losses.mean_squared_error,
optimizer=tf.optimizers.Adam(learning_rate=1e-2))
history = model.fit(train_dataset, epochs=EPOCHS, verbose=2)
结果如下
loss = history.history["loss"]
sns.set_style("darkgrid")
plt.figure(figsize=(8, 8))
plt.plot(loss)
plt.ylabel("Mean Squared Error (MSE)")
plt.xlabel("Epoch")
plt.title("Training Loss")
plt.show()
test_dataset = tfds.load("mnist", split=tfds.Split.TEST, batch_size=-1)
test_dataset = tfds.as_numpy(test_dataset)
test_features = test_dataset["image"]
test_features = test_features.astype("float32") / 255.
number = 10
plt.figure(figsize=(20, 4))
for index in range(number):
ax = plt.subplot(2, number, index + 1)
test_image = test_features[index]
test_image = test_image.reshape(28, 28)
plt.imshow(test_image)
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
ax = plt.subplot(2, number, index + 1 + number)
reconstructed = model(test_features[index].reshape(-1, 784))
reconstructed = reconstructed.numpy().reshape(28, 28)
plt.imshow(reconstructed)
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
结果如下
实现2
参考 https://blog.csdn.net/flame_alone/article/details/106469112 提前创建好ae_models文件夹存储图像,否则会报错
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image
from matplotlib import pyplot as plt
tf.random.set_seed(42)
np.random.seed(42)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
h_dim = 36
batchsz = 512
epochs = 50
lr = tf.linspace(0.001,0.001,epochs)
buffersize = batchsz*5
(x_train, y_train), (x_val, y_val) = keras.datasets.fashion_mnist.load_data()
x_train, x_val = x_train.astype(np.float32) / 255., x_val.astype(np.float32) / 255.
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
val_db = tf.data.Dataset.from_tensor_slices(x_val)
val_db = val_db.batch(batchsz)
class AutoEncoder(keras.Model):
def __init__(self,input_shape,hidden_list,activation=tf.nn.relu):
super(AutoEncoder, self).__init__()
center = hidden_list.pop()
self.encoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
+ [layers.Dense(center)])
hidden_list.reverse()
self.decoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
+ [layers.Dense(input_shape)])
def call(self, inputs, training=None):
h = self.encoder(inputs)
x_hat = self.decoder(h)
return x_hat
def train(train_db,val_db,input_shape=784):
model = AutoEncoder(input_shape,[392,196,98,36])
model.build(input_shape=(None, input_shape))
model.summary()
train_list = []
val_list = []
for epoch in range(epochs):
optimizer = tf.optimizers.Adam(lr=lr[epoch])
train_losses = 0
val_losses = 0
for step, x in enumerate(train_db):
x = tf.reshape(x, [-1, input_shape])
with tf.GradientTape() as tape:
x_rec_logits = tf.sigmoid(model(x))
rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
rec_loss = tf.reduce_mean(rec_loss)
train_losses += rec_loss
grads = tape.gradient(rec_loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
for x in val_db:
x = tf.reshape(x, [-1, input_shape])
x_rec_logits = tf.sigmoid(model(x))
rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
rec_loss = tf.reduce_mean(rec_loss)
val_losses += rec_loss
print(epoch,"train_losses :",float(train_losses),"val_losses :",float(val_losses))
train_list.append(train_losses)
val_list.append(val_losses)
model.save('/tmp/model')
x = [i for i in range(0, epochs)]
plt.figure()
plt.plot(x, train_list, color='blue', label='vaildation')
plt.plot(x, val_list, color='red', label='training')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
plt.close()
def save_images(imgs, name, shape=(32,32)):
new_im = Image.new('L', (28*shape[0], 28*shape[1]))
index = 0
for i in range(0, 28*shape[0], 28):
for j in range(0, 28*shape[1], 28):
im = imgs[index]
im = Image.fromarray(im, mode='L')
new_im.paste(im, (i, j))
index += 1
new_im.save(name)
def showImage(dataset,input_shape=784):
model = tf.keras.models.load_model('/tmp/model',compile=False)
for step,x in enumerate(val_db):
x_hat = tf.sigmoid(model(tf.reshape(x, [-1, input_shape])))
x_hat = tf.reshape(x_hat, [-1, 28, 28])
x_concat = tf.concat([x, x_hat], axis=0)
if(x_concat.shape[0] < batchsz * 2):
break
x_concat = x_concat.numpy() * 255.
x_concat = x_concat.astype(np.uint8)
shape=(int(tf.sqrt(batchsz*2.)),int(tf.sqrt(batchsz*2.)))
save_images(x_concat, 'ae_images/rec_epoch_%d.png'%step,shape)
train(train_db,val_db)
showImage(val_db)
测试tensorflow_GPU
"""Implementation of vanila autoencoder in TensorFlow 2.0 Subclassing API"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__version__ = "1.0.0"
__author__ = "Abien Fred Agarap"
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import tensorflow as tf
class Encoder(tf.keras.layers.Layer):
def __init__(self, intermediate_dim=128, code_dim=64):
super(Encoder, self).__init__()
self.hidden_layer = tf.keras.layers.Dense(
units=intermediate_dim, activation=tf.nn.relu
)
self.output_layer = tf.keras.layers.Dense(
units=code_dim, activation=tf.nn.sigmoid
)
def call(self, input_features):
activation = self.hidden_layer(input_features)
return self.output_layer(activation)
class Decoder(tf.keras.layers.Layer):
def __init__(self, original_dim, code_dim=64):
super(Decoder, self).__init__()
self.hidden_layer = tf.keras.layers.Dense(units=code_dim, activation=tf.nn.relu)
self.output_layer = tf.keras.layers.Dense(
units=original_dim, activation=tf.nn.sigmoid
)
def call(self, code):
activation = self.hidden_layer(code)
return self.output_layer(activation)
class Autoencoder(tf.keras.Model):
def __init__(self, code_dim=64, intermediate_dim=128, original_dim=784):
super(Autoencoder, self).__init__()
self.loss = []
self.encoder = Encoder(code_dim=code_dim, intermediate_dim=intermediate_dim)
self.decoder = Decoder(code_dim=code_dim, original_dim=original_dim)
def call(self, features):
code = self.encoder(features)
reconstructed = self.decoder(code)
return reconstructed
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
__author__ = "Richard Ricardo, Abien Fred Agarap"
__version__ = "1.0.0"
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import tensorflow_datasets as tfds
tf.compat.v1.enable_eager_execution()
tf.config.experimental.set_memory_growth(
tf.config.experimental.list_physical_devices('GPU')[0], True
)
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
BATCH_SIZE = 64
EPOCHS = 10
train_dataset = tfds.load("mnist", split=tfds.Split.TRAIN)
def normalize(example):
features = example["image"]
features = tf.reshape(features, [-1, 784])
features = tf.cast(features, tf.float32)
features = features / 255.
return features, features
train_dataset = train_dataset.map(normalize)
train_dataset = train_dataset.shuffle(1024)
train_dataset = train_dataset.batch(BATCH_SIZE, True)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
model = Autoencoder()
for batch_features, _ in train_dataset.take(1):
model(batch_features)
break
model.summary()
结果如下
model.compile(loss=tf.losses.mean_squared_error,
optimizer=tf.optimizers.Adam(learning_rate=1e-2))
history = model.fit(train_dataset, epochs=EPOCHS, verbose=2)
结果如下: 不知道为什么GPU跑的比本地的CPU还要慢,但是至少我可以知道它是在GPU上跑的,因为我从terminal可以观察到 其中GPU[0]就是我的程序,未启动程序前,我检测过0号GPU是程序调用的,而且我还发现,仅仅关闭jupyter其实并没有结束GPU上的程序,真正关闭jupyter的方式是 只有点击shutdown之后才能真正关闭一个jupyter,这个需要注意
loss = history.history["loss"]
sns.set_style("darkgrid")
plt.figure(figsize=(8, 8))
plt.plot(loss)
plt.ylabel("Mean Squared Error (MSE)")
plt.xlabel("Epoch")
plt.title("Training Loss")
plt.show()
test_dataset = tfds.load("mnist", split=tfds.Split.TEST, batch_size=-1)
test_dataset = tfds.as_numpy(test_dataset)
test_features = test_dataset["image"]
test_features = test_features.astype("float32") / 255.
number = 10
plt.figure(figsize=(20, 4))
for index in range(number):
ax = plt.subplot(2, number, index + 1)
test_image = test_features[index]
test_image = test_image.reshape(28, 28)
plt.imshow(test_image)
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
ax = plt.subplot(2, number, index + 1 + number)
reconstructed = model(test_features[index].reshape(-1, 784))
reconstructed = reconstructed.numpy().reshape(28, 28)
plt.imshow(reconstructed)
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
结果如下
|