引包
tf.distribute.MirroredStrategy 的用法
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()
import os
数据准备
下载MNIST在TensorFlow Datasets 加载。
将 with_info 设置为 True 会包含整个数据集的元数据,其中这些数据集将保存在 info 中。除此之外,该元数据对象包括训练和测试示例的数量。
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']
定义分配策略
创建一个 MirroredStrategy 对象。这将处理分配策略,并提供一个上下文管理器(tf.distribute.MirroredStrategy.scope)来构建你的模型。
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
设定输入管道
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
归一化
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
打乱数据,训练集测试集做一下设置
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)
模型准备
在strategy.scope() 上下文里创建模型
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
定义回调
- TensorBoard: 此回调(callbacks)为 TensorBoard 写入日志,允许您可视化图形。
- Model Checkpoint: 此回调(callbacks)在每个 epoch 后保存模型。
- Learning Rate Scheduler:使用此回调(callbacks),您可以安排学习率在每个 epoch/batch 之后更改。
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
def decay(epoch):
if epoch < 3:
return 1e-3
elif epoch >= 3 and epoch < 7:
return 1e-4
else:
return 1e-5
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
model.optimizer.lr.numpy()))
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs'),
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
save_weights_only=True),
tf.keras.callbacks.LearningRateScheduler(decay),
PrintLR()
]
跑起来
要查看模型的执行方式,请加载最新的检查点(checkpoint)并在测试数据上调用 evaluate 。使用适当的数据集调用 evaluate 。
model.fit(train_dataset, epochs=12, callbacks=callbacks)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
保存模型
path = 'saved_model/'
model.save(path, save_format='tf')
在无需 strategy.scope 加载模型。
unreplicated_model = tf.keras.models.load_model(path)
unreplicated_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
在含 strategy.scope 加载模型。
with strategy.scope():
replicated_model = tf.keras.models.load_model(path)
replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
|