def Unet(input_shape=(256,256,3), num_classes=21): ? ? inputs = Input(input_shape) ? ? feat1, feat2, feat3, feat4, feat5 = VGG16(inputs)? ? ? channels = [64, 128, 256, 512]
? ? # 32, 32, 512 -> 64, 64, 512 ? ? P5_up = UpSampling2D(size=(2, 2))(feat5) ? ? # 64, 64, 512 + 64, 64, 512 -> 64, 64, 1024 ? ? P4 = Concatenate(axis=3)([feat4, P5_up]) ? ? # 64, 64, 1024 -> 64, 64, 512 ? ? P4 = Conv2D(channels[3], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P4) ? ? P4 = Conv2D(channels[3], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P4)
? ? # 64, 64, 512 -> 128, 128, 512 ? ? P4_up = UpSampling2D(size=(2, 2))(P4) ? ? # 128, 128, 256 + 128, 128, 512 -> 128, 128, 768 ? ? P3 = Concatenate(axis=3)([feat3, P4_up]) ? ? # 128, 128, 768 -> 128, 128, 256 ? ? P3 = Conv2D(channels[2], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P3) ? ? P3 = Conv2D(channels[2], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P3)
? ? # 128, 128, 256 -> 256, 256, 256 ? ? P3_up = UpSampling2D(size=(2, 2))(P3) ? ? # 256, 256, 256 + 256, 256, 128 -> 256, 256, 384 ? ? P2 = Concatenate(axis=3)([feat2, P3_up]) ? ? # 256, 256, 384 -> 256, 256, 128 ? ? P2 = Conv2D(channels[1], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P2) ? ? P2 = Conv2D(channels[1], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P2)
? ? # 256, 256, 128 -> 512, 512, 128 ? ? P2_up = UpSampling2D(size=(2, 2))(P2) ? ? # 512, 512, 128 + 512, 512, 64 -> 512, 512, 192 ? ? P1 = Concatenate(axis=3)([feat1, P2_up]) ? ? # 512, 512, 192 -> 512, 512, 64 ? ? P1 = Conv2D(channels[0], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P1) ? ? P1 = Conv2D(channels[0], 3, activation='relu', padding='same', kernel_initializer = RandomNormal(stddev=0.02))(P1)
? ? # 512, 512, 64 -> 512, 512, num_classes ? ? P1 = Conv2D(num_classes, 1, activation="softmax")(P1)
? ? model = Model(inputs=inputs, outputs=P1) ? ? return model
# step0:参数配置 # dataset_path = r"G:\deep_learning_data\EG_dataset\voc_format" ?# local? dataset_path = os.path.join(BASE_DIR, "..", "data", "dataset", "voc_format") ?# ?linux? model_path = os.path.join(BASE_DIR, "data", "model_data", "unet_voc.h5")
max_epoch = 100 ?# 总迭代轮 Batch_size = 1 ?? inputs_size = [224, 224, 3] num_classes = 2 ?# 模型输出通道数, 这包含背景类别数,本例中为 1+1=2 ? #? lr = 1e-4 decay_rate = 0.95 ?# 指数衰减参数,每个epoch之后,学习率衰减率
import datetime curr_time = datetime.datetime.now() time_str = datetime.datetime.strftime(curr_time, '%Y_%m_%d_%H_%M_%S') loss_history = LossHistory("logs/", time_str) log_dir = os.path.join(BASE_DIR, "logs", "loss_" + time_str) print("日志文件夹位于:{}".format(log_dir))
# step1:数据集创建 with open(os.path.join(dataset_path, "ImageSets/Segmentation/train.txt"), "r") as f: ? ? train_lines = f.readlines() with open(os.path.join(dataset_path, "ImageSets/Segmentation/val.txt"), "r") as f: ? ? val_lines = f.readlines()
epoch_size = len(train_lines) // Batch_size ?# 计算一个epoch有几个iteration epoch_size_val = len(val_lines) // Batch_size
# ?利用生成器创建dataset gen = Generator(Batch_size, train_lines, inputs_size, num_classes, dataset_path) gen = tf.data.Dataset.from_generator(partial(gen.generate, random_data=True), (tf.float32, tf.float32)) gen = gen.shuffle(buffer_size=Batch_size).prefetch(buffer_size=Batch_size)
gen_val = Generator(Batch_size, val_lines, inputs_size, num_classes, dataset_path) gen_val = tf.data.Dataset.from_generator(partial(gen_val.generate, random_data=False), (tf.float32, tf.float32)) gen_val = gen_val.shuffle(buffer_size=Batch_size).prefetch(buffer_size=Batch_size)
if epoch_size == 0 or epoch_size_val == 0: ? ? raise ValueError("数据集过小,无法进行训练,请扩充数据集。")
# step2: 创建model model = Unet(inputs_size, num_classes) ?# 需要传入输入大小和最终输出通道数 model.load_weights(model_path, by_name=True, skip_mismatch=True) ?# 加载预训练模型
# step3:创建loss及优化器 loss = CE() lr_schedule = ExponentialDecay(initial_learning_rate=lr, decay_steps=epoch_size,decay_rate=decay_rate, staircase=True) optimizer = Adam(learning_rate=lr_schedule)
# step4:迭代训练 for epoch in range(max_epoch): ? ? fit_one_epoch(model, loss, optimizer, epoch, epoch_size, epoch_size_val, gen, gen_val, max_epoch, get_train_step_fn()) ? ? path_model = os.path.join(log_dir, "model_weight_{}.h5".format(time_str)) ? ? model.save_weights(path_model) ? ? print("Epoch:{}, model save at :{}".format(epoch, path_model))
|