引言
你这个模型, 它复现起来难吗?
我一水博客的, 能给你看复现不出来的算法?
效果展示
将梵高的代表作<星空>的风格, 迁移到视频上
  
准备工作
- 华强买瓜原版视频, 不需要音频. 毕竟处理的图像不包含音频信息. 最好也不要包含字幕, 字幕会影响整体的效果. 音频和字幕可以后期用PR剪辑上去
- 风格图像, 随便挑几幅, 最好是印象派的画作, 非常具有风格
- Python3.6.7
- Tensorflow2.0.0
- OpenCV-Python3.4.2.16
具体步骤
让我们新建一个HuaQiangBuyWatermelon的项目…
业务逻辑
原来我计划的流程是这样的
原视频 = 读取视频
写入视频 = 视频写入对象
风格图像 = 读取风格图像
for each 当前帧 in 原视频
转换帧 = 转换风格(当前帧, 风格图像)
写入视频(转换帧)
end for
但是过程中OpenCV的VideoWriter出了点问题, 写不进去. 无奈只能采用以下的逻辑
for each 当前帧 in 原视频
保存(当前帧.jpg)
end for
for each 当前图像 in 所有保存的帧
转换帧 = 转换风格(当前图像, 风格图像)
保存(转换帧)
end for
最后得到了每一帧的经过风格转换的图像, 再借助PR将这些所有的转换帧剪成视频
日志
整个模型虽然简单, 但是非常耗时, 我截取的华强买瓜视频一共6900帧. 为了DEBUG方便, 日志是一个好习惯.
新建logger.py文件
为了方便DEBUG, 我会在每行日志的最前面显示输出的类型, 比如INFO, WARNING, ERROR
import time
logger = open('./log.txt', 'a+')
def log_string(out_str, dtype='INFO', to_log_file=True):
"""
保存日志
:param to_log_file:
:param dtype:
:param out_str:
"""
global logger
if not isinstance(out_str, str):
out_str = str(out_str)
if '\\r' == out_str:
print()
if to_log_file:
logger.write('\n')
logger.flush()
else:
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
out_str = '[{:^7s}] [{}] '.format(dtype, local_time) + out_str
if 'INFO' == dtype:
print(out_str)
elif 'WARNING' == dtype:
print("\033[0;34m{}\033[0m".format(out_str))
else:
print("\033[0;31m{}\033[0m".format(out_str))
if to_log_file:
logger.write(out_str + '\n')
logger.flush()
保存的日志大概如下:

转换风格
这里用到了神经网络, 简单来说就是通过一个深层的VGG网络去提取图像更深层的信息, 难度在于损失函数的设计, 这里不赘述, 感兴趣的读者请查阅其它文献.
train()接受4个参数:
- content_image 待转换的图像
- style_image 风格图像, 将这张图像的风格迁移到content_image上
- num_epochs epoch的数量, 训练的周期数
- remain 剩余的待处理的图像数量, 用来估计剩余时间ETA
该函数返回风格转换完毕的图像
新建train.py文件
def train(content_image, style_image, num_epochs, remain):
def clip_0_1(image_):
return tf.clip_by_value(image_, clip_value_min=0.0, clip_value_max=1.0)
def style_content_loss(outputs):
style_outputs = outputs['style']
content_outputs = outputs['content']
style_loss = tf.add_n([tf.reduce_mean((style_outputs[name] - style_targets[name]) ** 2)
for name in style_outputs.keys()])
style_loss *= style_weight / num_style_layers
content_loss = tf.add_n([tf.reduce_mean((content_outputs[name] - content_targets[name]) ** 2)
for name in content_outputs.keys()])
content_loss *= content_weight / num_content_layers
loss = style_loss + content_loss
return loss
@tf.function()
def train_step(image_):
with tf.GradientTape() as tape:
outputs = extractor(image_)
loss = style_content_loss(outputs)
grad = tape.gradient(loss, image_)
opt.apply_gradients([(grad, image_)])
image_.assign(clip_0_1(image_))
content_layers = ['block5_conv2']
style_layers = ['block1_conv1',
'block2_conv1',
'block3_conv1',
'block4_conv1',
'block5_conv1']
num_content_layers = len(content_layers)
num_style_layers = len(style_layers)
extractor = StyleContentModel(style_layers, content_layers)
style_targets = extractor(style_image)['style']
content_targets = extractor(content_image)['content']
image = tf.Variable(content_image)
opt = tf.optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)
style_weight = 1e-2
content_weight = 1e4
start = time.time()
epochs = num_epochs
steps_per_epoch = 50
step = 0
for n in range(epochs):
start2 = time.time()
for m in range(steps_per_epoch):
step += 1
train_step(image)
end2 = time.time()
log_string('time of epoch {}: {:.2f}s'.format(n+1, end2-start2))
end = time.time()
log_string("Total time: {:.2f}s, ETA: {:.2f}min".format(end - start, remain*(end - start)/60))
return image[0].numpy()
其中用到的各种函数如下
from __future__ import absolute_import, division, print_function, unicode_literals
import time
from abc import ABC
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from logger import log_string
mpl.rcParams['figure.figsize'] = (13, 10)
mpl.rcParams['axes.grid'] = False
def load_img(path_to_img):
max_dim = 512
img = tf.io.read_file(path_to_img)
img = tf.image.decode_jpeg(img)
img = tf.image.convert_image_dtype(img, tf.float32)
shape = tf.cast(tf.shape(img)[:-1], tf.float32)
long = max(shape)
scale = max_dim / long
new_shape = tf.cast(shape * scale, tf.int32)
img = tf.image.resize(img, new_shape)
img = img[tf.newaxis, :]
return img
def vgg_layers(layer_names):
""" Creates a vgg model that returns a list of intermediate output values."""
vgg = tf.keras.applications.VGG19(include_top=False, weights='imagenet')
vgg.trainable = False
outputs = [vgg.get_layer(name).output for name in layer_names]
model = tf.keras.Model([vgg.input], outputs)
return model
def gram_matrix(input_tensor):
result = tf.linalg.einsum('bijc,bijd->bcd', input_tensor, input_tensor)
input_shape = tf.shape(input_tensor)
num_locations = tf.cast(input_shape[1] * input_shape[2], tf.float32)
return result / num_locations
class StyleContentModel(tf.keras.models.Model, ABC):
def __init__(self, style_layers_, content_layers_):
super(StyleContentModel, self).__init__()
self.vgg = vgg_layers(style_layers_ + content_layers_)
self.style_layers = style_layers_
self.content_layers = content_layers_
self.num_style_layers = len(style_layers_)
self.vgg.trainable = False
def call(self, input_, **kwargs):
input_ = input_ * 255.0
preprocessed_input = tf.keras.applications.vgg19.preprocess_input(input_)
outputs = self.vgg(preprocessed_input)
style_outputs, content_outputs = (
outputs[:self.num_style_layers],
outputs[self.num_style_layers:])
style_outputs = [gram_matrix(style_output)
for style_output in style_outputs]
content_dict = {content_name: value
for content_name, value
in zip(self.content_layers, content_outputs)}
style_dict = {style_name: value
for style_name, value
in zip(self.style_layers, style_outputs)}
return {'content': content_dict, 'style': style_dict}
读取视频并保存每一帧
新建generate.py文件
这里使用OpenCV的VideoCapture读取视频, 然后一帧一帧保存下来
这个函数只要运行一次就行了, 然后就可以注释掉了
def split_frames():
cap = cv.VideoCapture('./video/video.mp4')
cnt = 0
while True:
ret, frame = cap.read()
if not ret:
break
cnt += 1
log_string('saving frame {}'.format(cnt))
cv.imwrite('./contents/{}.jpg'.format(cnt), frame)
cap.release()
log_string('end')
log_string('\\r')
log_string('\\r')
最后, 再将保存好的这些原视频的帧, 一张一张传递给train()函数即可.
但是这里需要注意, os.listdir()返回的目录下的所有文件, 是按文件名的字典序排序的, 也就是说, 假如有1.jpg, 2.jpg, …, 20.jpg这20个图像, 它返回的结果是
1.jpg, 11.jpg, 12.jpg, 13.jpg, 14.jpg, 15.jpg, 16.jpg, 17.jpg, 18.jpg, 19.jpg,
2.jpg, 21.jpg, 22.jpg, 23.jpg, 24.jpg, 25.jpg, 26.jpg, 27.jpg, 28.jpg, 29.jpg
因此保险起见, 最好对它重新排个序(当然不排序的话, 只要按原文件名去保存转换好的图像, 也是可以的, 但是为啥推荐做排序, 假设处理某一帧的时候出错了, 那么排序后能够快速定位到这一帧的索引, 方便后续DEBUG)
files = os.listdir(content_dir)
num_frames = len(files)
files.sort(key=lambda x: int(x[:-4]))
这里需要将x.jpg中的最后4个字符.jpg去掉, 只留下一个int型的x, 然后升序排序.
def generate():
style_img = load_img('./styles/denoised_starry.jpg')
content_dir = './contents/'
files = os.listdir(content_dir)
num_frames = len(files)
files.sort(key=lambda x: int(x[:-4]))
num_epochs = 5
begin = args.begin-1
end = args.end
cnt = begin
for cur in range(begin, end):
file = files[cur]
log_string('-' * 60)
cnt += 1
save_name = './generates/{}.jpg'.format(cnt)
log_string('cnt: {}/{}, current image: {}'.format(cnt, num_frames, file))
content_img = load_img('{}/{}'.format(content_dir, file))
img_styled = train(content_img, style_img, num_epochs, num_frames - cnt) * 255
log_string('saving to {}'.format(save_name))
img_styled = cv.cvtColor(img_styled, cv.COLOR_RGB2BGR)
cv.imwrite(save_name, img_styled)
log_string('-' * 60)
log_string('\\r')
if __name__ == '__main__':
split_frames()
问题
由于整个模型需要的时间非常长, 过程中万一出现异常, 那就直接终止了, 比如你睡前开始运行…结果半小时后它碰到异常停了…那一晚上就浪费了…为了避免重新跑带来的麻烦, 需要提高点鲁棒性. 最容易发生的问题其实是OOM爆显存. 一旦发生了任何问题, 整个程序就停止了, 这时候就体现出来日志的重要性了, 查看日志可以快速地定位到模型处理到哪一帧, 然后后期再补上这一帧.
其实只需要给generate.py用try catch捕捉异常即可. 等模型跑完之后, 查看日志, 看看有多少个ERROR, 重新跑一遍这些帧即可, 不用把所有的6900帧都跑一遍.
新建main.py
import os
from logger import log_string
if __name__ == '__main__':
steps = 50
for begin in range(135, 6835, steps):
cmd = 'python generate.py -begin {} -end {}'.format(begin, begin+steps-1)
try:
log_string('run {}'.format(cmd))
os.system(cmd)
except:
log_string('cmd {} failed'.format(cmd), 'ERROR')
最后运行的时候只需要运行main.py即可. 由于加了try catch, 碰到异常也不会终止进程. 发生异常的时候会在日志里面记录, 最后只要等它跑完, 去日志里面定位ERROR, 然后把这些缺失的帧补上就行.
|