引言
你这个模型, 它复现起来难吗?
我一水博客的, 能给你看复现不出来的算法?
效果展示
将梵高的代表作<星空>的风格, 迁移到视频上
准备工作
- 华强买瓜原版视频, 不需要音频. 毕竟处理的图像不包含音频信息. 最好也不要包含字幕, 字幕会影响整体的效果. 音频和字幕可以后期用PR剪辑上去
- 风格图像, 随便挑几幅, 最好是印象派的画作, 非常具有风格
- Python3.6.7
- Tensorflow2.0.0
- OpenCV-Python3.4.2.16
具体步骤
让我们新建一个HuaQiangBuyWatermelon 的项目…
业务逻辑
原来我计划的流程是这样的
原视频 = 读取视频
写入视频 = 视频写入对象
风格图像 = 读取风格图像
for each 当前帧 in 原视频
转换帧 = 转换风格(当前帧, 风格图像)
写入视频(转换帧)
end for
但是过程中OpenCV 的VideoWriter 出了点问题, 写不进去. 无奈只能采用以下的逻辑
for each 当前帧 in 原视频
保存(当前帧.jpg)
end for
for each 当前图像 in 所有保存的帧
转换帧 = 转换风格(当前图像, 风格图像)
保存(转换帧)
end for
最后得到了每一帧的经过风格转换的图像, 再借助PR将这些所有的转换帧剪成视频
日志
整个模型虽然简单, 但是非常耗时, 我截取的华强买瓜视频一共6900帧. 为了DEBUG方便, 日志是一个好习惯.
新建logger.py 文件
为了方便DEBUG, 我会在每行日志的最前面显示输出的类型, 比如INFO, WARNING, ERROR
import time
logger = open('./log.txt', 'a+')
def log_string(out_str, dtype='INFO', to_log_file=True):
"""
保存日志
:param to_log_file:
:param dtype:
:param out_str:
"""
global logger
if not isinstance(out_str, str):
out_str = str(out_str)
if '\\r' == out_str:
print()
if to_log_file:
logger.write('\n')
logger.flush()
else:
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
out_str = '[{:^7s}] [{}] '.format(dtype, local_time) + out_str
if 'INFO' == dtype:
print(out_str)
elif 'WARNING' == dtype:
print("\033[0;34m{}\033[0m".format(out_str))
else:
print("\033[0;31m{}\033[0m".format(out_str))
if to_log_file:
logger.write(out_str + '\n')
logger.flush()
保存的日志大概如下:
转换风格
这里用到了神经网络, 简单来说就是通过一个深层的VGG 网络去提取图像更深层的信息, 难度在于损失函数的设计, 这里不赘述, 感兴趣的读者请查阅其它文献.
train() 接受4个参数:
- content_image 待转换的图像
- style_image 风格图像, 将这张图像的风格迁移到content_image上
- num_epochs epoch的数量, 训练的周期数
- remain 剩余的待处理的图像数量, 用来估计剩余时间ETA
该函数返回风格转换完毕的图像
新建train.py 文件
def train(content_image, style_image, num_epochs, remain):
def clip_0_1(image_):
return tf.clip_by_value(image_, clip_value_min=0.0, clip_value_max=1.0)
def style_content_loss(outputs):
style_outputs = outputs['style']
content_outputs = outputs['content']
style_loss = tf.add_n([tf.reduce_mean((style_outputs[name] - style_targets[name]) ** 2)
for name in style_outputs.keys()])
style_loss *= style_weight / num_style_layers
content_loss = tf.add_n([tf.reduce_mean((content_outputs[name] - content_targets[name]) ** 2)
for name in content_outputs.keys()])
content_loss *= content_weight / num_content_layers
loss = style_loss + content_loss
return loss
@tf.function()
def train_step(image_):
with tf.GradientTape() as tape:
outputs = extractor(image_)
loss = style_content_loss(outputs)
grad = tape.gradient(loss, image_)
opt.apply_gradients([(grad, image_)])
image_.assign(clip_0_1(image_))
content_layers = ['block5_conv2']
style_layers = ['block1_conv1',
'block2_conv1',
'block3_conv1',
'block4_conv1',
'block5_conv1']
num_content_layers = len(content_layers)
num_style_layers = len(style_layers)
extractor = StyleContentModel(style_layers, content_layers)
style_targets = extractor(style_image)['style']
content_targets = extractor(content_image)['content']
image = tf.Variable(content_image)
opt = tf.optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)
style_weight = 1e-2
content_weight = 1e4
start = time.time()
epochs = num_epochs
steps_per_epoch = 50
step = 0
for n in range(epochs):
start2 = time.time()
for m in range(steps_per_epoch):
step += 1
train_step(image)
end2 = time.time()
log_string('time of epoch {}: {:.2f}s'.format(n+1, end2-start2))
end = time.time()
log_string("Total time: {:.2f}s, ETA: {:.2f}min".format(end - start, remain*(end - start)/60))
return image[0].numpy()
其中用到的各种函数如下
from __future__ import absolute_import, division, print_function, unicode_literals
import time
from abc import ABC
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from logger import log_string
mpl.rcParams['figure.figsize'] = (13, 10)
mpl.rcParams['axes.grid'] = False
def load_img(path_to_img):
max_dim = 512
img = tf.io.read_file(path_to_img)
img = tf.image.decode_jpeg(img)
img = tf.image.convert_image_dtype(img, tf.float32)
shape = tf.cast(tf.shape(img)[:-1], tf.float32)
long = max(shape)
scale = max_dim / long
new_shape = tf.cast(shape * scale, tf.int32)
img = tf.image.resize(img, new_shape)
img = img[tf.newaxis, :]
return img
def vgg_layers(layer_names):
""" Creates a vgg model that returns a list of intermediate output values."""
vgg = tf.keras.applications.VGG19(include_top=False, weights='imagenet')
vgg.trainable = False
outputs = [vgg.get_layer(name).output for name in layer_names]
model = tf.keras.Model([vgg.input], outputs)
return model
def gram_matrix(input_tensor):
result = tf.linalg.einsum('bijc,bijd->bcd', input_tensor, input_tensor)
input_shape = tf.shape(input_tensor)
num_locations = tf.cast(input_shape[1] * input_shape[2], tf.float32)
return result / num_locations
class StyleContentModel(tf.keras.models.Model, ABC):
def __init__(self, style_layers_, content_layers_):
super(StyleContentModel, self).__init__()
self.vgg = vgg_layers(style_layers_ + content_layers_)
self.style_layers = style_layers_
self.content_layers = content_layers_
self.num_style_layers = len(style_layers_)
self.vgg.trainable = False
def call(self, input_, **kwargs):
input_ = input_ * 255.0
preprocessed_input = tf.keras.applications.vgg19.preprocess_input(input_)
outputs = self.vgg(preprocessed_input)
style_outputs, content_outputs = (
outputs[:self.num_style_layers],
outputs[self.num_style_layers:])
style_outputs = [gram_matrix(style_output)
for style_output in style_outputs]
content_dict = {content_name: value
for content_name, value
in zip(self.content_layers, content_outputs)}
style_dict = {style_name: value
for style_name, value
in zip(self.style_layers, style_outputs)}
return {'content': content_dict, 'style': style_dict}
读取视频并保存每一帧
新建generate.py 文件
这里使用OpenCV 的VideoCapture 读取视频, 然后一帧一帧保存下来
这个函数只要运行一次就行了, 然后就可以注释掉了
def split_frames():
cap = cv.VideoCapture('./video/video.mp4')
cnt = 0
while True:
ret, frame = cap.read()
if not ret:
break
cnt += 1
log_string('saving frame {}'.format(cnt))
cv.imwrite('./contents/{}.jpg'.format(cnt), frame)
cap.release()
log_string('end')
log_string('\\r')
log_string('\\r')
最后, 再将保存好的这些原视频的帧, 一张一张传递给train() 函数即可.
但是这里需要注意, os.listdir() 返回的目录下的所有文件, 是按文件名的字典序排序的, 也就是说, 假如有1.jpg, 2.jpg, …, 20.jpg 这20个图像, 它返回的结果是
1.jpg, 11.jpg, 12.jpg, 13.jpg, 14.jpg, 15.jpg, 16.jpg, 17.jpg, 18.jpg, 19.jpg,
2.jpg, 21.jpg, 22.jpg, 23.jpg, 24.jpg, 25.jpg, 26.jpg, 27.jpg, 28.jpg, 29.jpg
因此保险起见, 最好对它重新排个序(当然不排序的话, 只要按原文件名去保存转换好的图像, 也是可以的, 但是为啥推荐做排序, 假设处理某一帧的时候出错了, 那么排序后能够快速定位到这一帧的索引, 方便后续DEBUG)
files = os.listdir(content_dir)
num_frames = len(files)
files.sort(key=lambda x: int(x[:-4]))
这里需要将x.jpg 中的最后4个字符.jpg 去掉, 只留下一个int 型的x , 然后升序排序.
def generate():
style_img = load_img('./styles/denoised_starry.jpg')
content_dir = './contents/'
files = os.listdir(content_dir)
num_frames = len(files)
files.sort(key=lambda x: int(x[:-4]))
num_epochs = 5
begin = args.begin-1
end = args.end
cnt = begin
for cur in range(begin, end):
file = files[cur]
log_string('-' * 60)
cnt += 1
save_name = './generates/{}.jpg'.format(cnt)
log_string('cnt: {}/{}, current image: {}'.format(cnt, num_frames, file))
content_img = load_img('{}/{}'.format(content_dir, file))
img_styled = train(content_img, style_img, num_epochs, num_frames - cnt) * 255
log_string('saving to {}'.format(save_name))
img_styled = cv.cvtColor(img_styled, cv.COLOR_RGB2BGR)
cv.imwrite(save_name, img_styled)
log_string('-' * 60)
log_string('\\r')
if __name__ == '__main__':
split_frames()
问题
由于整个模型需要的时间非常长, 过程中万一出现异常, 那就直接终止了, 比如你睡前开始运行…结果半小时后它碰到异常停了…那一晚上就浪费了…为了避免重新跑带来的麻烦, 需要提高点鲁棒性. 最容易发生的问题其实是OOM 爆显存. 一旦发生了任何问题, 整个程序就停止了, 这时候就体现出来日志的重要性了, 查看日志可以快速地定位到模型处理到哪一帧, 然后后期再补上这一帧.
其实只需要给generate.py 用try catch 捕捉异常即可. 等模型跑完之后, 查看日志, 看看有多少个ERROR , 重新跑一遍这些帧即可, 不用把所有的6900帧都跑一遍.
新建main.py
import os
from logger import log_string
if __name__ == '__main__':
steps = 50
for begin in range(135, 6835, steps):
cmd = 'python generate.py -begin {} -end {}'.format(begin, begin+steps-1)
try:
log_string('run {}'.format(cmd))
os.system(cmd)
except:
log_string('cmd {} failed'.format(cmd), 'ERROR')
最后运行的时候只需要运行main.py 即可. 由于加了try catch , 碰到异常也不会终止进程. 发生异常的时候会在日志里面记录, 最后只要等它跑完, 去日志里面定位ERROR , 然后把这些缺失的帧补上就行.
|