简介
环境
NumPy 是使用 Python 进行科学计算的基础包。
Numba 是一个开源 JIT 编译器,它将 Python 和 NumPy 代码的子集转换为快速机器码。
SciPy 是数学、科学和工程的开源软件。SciPy 库依赖于 NumPy,它提供方便快捷的 N 维数组操作。
h5py 从 Python 读取和写入 HDF5文件。
pandas 用于数据分析、时间序列和统计的强大的数据结构。
opencv-python 用于 Python 的预构建 OpenCV 包。
moviepy 用于进行视频处理的工具包
Filterpy 实现了卡尔曼滤波器和粒子滤波器等
conda create -n CGLearnIng36 python=3.6
amqp==2.5.2
billiard==3.6.3.0
celery==4.4.2
certifi==2019.11.28
chardet==3.0.4
click==7.1.1
colorama==0.3.9
cycler==0.10.0
Cython==0.29.4
decorator==4.4.2
Django==2.2.10
django-redis==4.11.0
filterpy==1.4.5
fire==0.1.3
Flask==1.1.1
h5py==2.8.0
idna==2.8
imageio==2.8.0
imageio-ffmpeg==0.4.1
importlib-metadata==1.6.0
imutils==0.5.3
itsdangerous==1.1.0
Jinja2==2.11.1
Kalman==0.1.3
kiwisolver==1.1.0
kombu==4.6.8
llvmlite==0.31.0
MarkupSafe==1.1.1
matplotlib==2.2.3
moviepy==1.0.1
numba==0.39.0
numpy==1.15.4
opencv-python==3.4.3.18
pandas==0.23.4
Pillow==7.0.0
proglog==0.1.9
protobuf==3.11.3
pyparsing==2.4.6
python-dateutil==2.8.1
pytz==2019.3
redis==3.4.1
requests==2.21.0
scipy==1.1.0
six==1.14.0
sqlparse==0.3.1
tensorboardX==1.6
torch==0.4.1
torchvision==0.2.1
tqdm==4.29.1
urllib3==1.24.3
vine==1.3.0
Werkzeug==1.0.0
zipp==3.1.0
numba
numba是一个用于编译Python数组和数值计算函数的编译器,这个编译器能够大幅提高直接使用Python编写的函数的运算速度。 numba使用LLVM编译器架构将纯Python代码生成优化过的机器码,通过一些添加简单的注解,将面向数组和使用大量数学的python代码优化到与c,c++和Fortran类似的性能,而无需改变Python的解释器。numba的编译方式如下图所示: 虽然有 cython 和 Pypy 之类的许多其他编译器,选择Numbade 原因很简单,不必离开 python 代码的舒适区,不需要为了获得一些的加速来改变您的代码,我们只需要添加一个装饰器到Python函数中即可完成加速,而且加速效果与cython代码相当。
numba对代码进行加速时,给要优化的函数加上@jit优化器即可。使用jit的时候可以让numba来决定什么时候以及怎么做优化。如下简单的例子所示:
from numba import jit
@jit
def f(x, y):
return x + y
这段代码的计算在被调用是第一次执行,numba将在调用期间推断参数类型,然后基于这个信息生成优化后的代码。numba也能够基于输入的类型编译生成特定的代码。例如,对于上面的代码,传入整数和浮点数作为参数将会生成不同的代码: Numba编译的函数可以调用其他编译函数
@jit
def hypot(x, y):
return math.sqrt(square(x) + square(y))
from numba import jit
import time
@jit
def foo():
x = []
for a in range(100000000):
x.append(a)
def foo_withoutfit():
y = []
for b in range(100000000):
y.append(b)
现在我们定义相同的方法,实现的功能也是一样的,一个是利用numba进行加速,一个没有加速,我们看下他们的运行时间:
imutils
imutils是在OPenCV基础上的一个封装,达到更为简结的调用OPenCV接口的目的,它可以轻松的实现图像的平移,旋转,缩放,骨架化等一系列的操作。
- 图像平移
OpenCV中也提供了图像平移的实现,要先计算平移矩阵,然后利用仿射变换实现平移,在imutils中可直接进行图像的平移。
translated = imutils.translate(img,x,y)
- 图像缩放
图片的缩放在OPenCV中要注意确保保持宽高比。而在imutils中自动保持原有图片的宽高比,只指定宽度weight和Height即可。
img = cv.imread("lion.jpeg")
resized = imutils.resize(img,width=200)
print("原图像大小: ", img.shape)
print("缩放后大小:",resized.shape)
plt.figure(figsize=[10, 10])
plt.subplot(1,2,1)
plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB))
plt.title('原图')
plt.axis("off")
plt.subplot(1,2,2)
plt.imshow(cv.cvtColor(resized, cv.COLOR_BGR2RGB))
plt.title('缩放结果')
plt.axis("off")
plt.show()
- 图像旋转
在OpenCV中进行旋转时使用的是仿射变换,在这里图像旋转方法是imutils.rotate(),跟2个参数,第一个是图片数据,第二个是旋转的角度,旋转是朝逆时针方向。同时imutils还提供了另一个相似的方法, rotate_round(),它就是按顺时针旋转的。
import cv2
import imutils
image = cv2.imread('lion.jpeg')
rotated = imutils.rotate(image, 90)
rotated_round = imutils.rotate_bound(image, 90)
plt.figure(figsize=[10, 10])
plt.subplot(1,3,1)
plt.imshow(img[:,:,::-1])
plt.title('原图')
plt.axis("off")
plt.subplot(1,3,2)
plt.imshow(rotated[:,:,::-1])
plt.title('逆时针旋转90度')
plt.axis("off")
plt.subplot(1,3,3)
plt.imshow(rotated_round[:,:,::-1])
plt.title('顺时针旋转90度')
plt.axis("off")
plt.show()
- 骨架提取
骨架提取,是指对图片中的物体进行拓扑骨架(topological skeleton)构建的过程。imutils提供的方法是skeletonize(),第二个参数是结构参数的尺寸(structuring element),相当于是一个粒度,越小需要处理的时间越长。
import cv2
import imutils
image = cv2.imread('lion.jpeg')
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
skeleton = imutils.skeletonize(gray, size=(3, 3))
plt.figure(figsize=(10,8),dpi=100)
plt.subplot(121),plt.imshow(img[:,:,::-1]),plt.title('原图')
plt.xticks([]), plt.yticks([])
plt.subplot(122),plt.imshow(skeleton,cmap="gray"),plt.title('骨架提取结果')
plt.xticks([]), plt.yticks([])
plt.show()
- Matplotlib显示
在OpenCV的Python绑定中,图像以BGR顺序表示为NumPy数组。使用该cv2.imshow功能时效果很好。但是,如果打算使用Matplotlib,该plt.imshow函数将假定图像按RGB顺序排列。调用cv2.cvtColor解决此问题,也可以使用opencv2matplotlib便捷功能。
img = cv.imread("lion.jpeg")
plt.figure()
plt.imshow(imutils.opencv2matplotlib(img))
- OPenCV版本的检测
OpenCV 4发布之后,随着主要版本的更新,向后兼容性问题尤为突出。在使用OPenCV时,应检查当前正在使用哪个版本的OpenCV,然后使用适当的函数或方法。在imutils中的is_cv2()、is_cv3()和is_cv4()是可用于自动确定当前环境的OpenCV的版本简单的功能。
print("OPenCV版本: {}".format(cv2.__version__))
print("OPenCV是2.X? {}".format(imutils.is_cv2()))
print("OPenCV是3.X? {}".format(imutils.is_cv3()))
print("OPenCV是4.X? {}".format(imutils.is_cv4()))
cv.dnn
OPenCV自3.3版本开始,加入了对深度学习网络的支持,即DNN模块,它支持主流的深度学习框架生成与到处模型的加载。
OpenCV中的深度学习模块(DNN)只提供了推理功能,不涉及模型的训练,支持多种深度学习框架,比如TensorFlow,Caffe,Torch和Darknet。
- 架构
除了上述的加速方法外,DNN模块还有网络层面的优化。这种优化优化分两类,一类是层融合,还有一类是内存复用。
dnn.blobFromImage 作用:根据输入图像,创建维度N(图片的个数),通道数C,高H和宽W次序的blobs
blobFromImage(image,
scalefactor=None,
size=None,
mean=None,
swapRB=None,
crop=None,
ddepth=None):
import cv2
from cv2 import dnn
import numpy as np
import matplotlib.pyplot as plt
img_cv2 = cv2.imread("test.jpeg")
print("原图像大小: ", img_cv2.shape)
inWidth = 256
inHeight = 256
outBlob1 = cv2.dnn.blobFromImage(img_cv2,
scalefactor=1.0 / 255,
size=(inWidth, inHeight),
mean=(0, 0, 0),
swapRB=False,
crop=False)
print("未裁剪输出: ", outBlob1.shape)
outimg1 = np.transpose(outBlob1[0], (1, 2, 0))
outBlob2 = cv2.dnn.blobFromImage(img_cv2,
scalefactor=1.0 / 255,
size=(inWidth, inHeight),
mean=(0, 0, 0),
swapRB=False,
crop=True)
print("裁剪输出: ", outBlob2.shape)
outimg2 = np.transpose(outBlob2[0], (1, 2, 0))
plt.figure(figsize=[10, 10])
plt.subplot(1, 3, 1)
plt.title('输入图像', fontsize=16)
plt.imshow(cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.subplot(1, 3, 2)
plt.title('输出图像 - 未裁剪', fontsize=16)
plt.imshow(cv2.cvtColor(outimg1, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.subplot(1, 3, 3)
plt.title('输出图像 - 裁剪', fontsize=16)
plt.imshow(cv2.cvtColor(outimg2, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()
另外一个API与上述API类似,是进行批量图片处理的
blobFromImages(images,
scalefactor=None,
size=None, mean=None,
swapRB=None,
crop=None,
ddepth=None):
作用:批量处理图片,创建4维的blob,其它参数类似于 dnn.blobFromImage
.dnn.NMSBoxes 作用:根据给定的检测boxes和对应的scores进行NMS(非极大值抑制)处理
NMSBoxes(bboxes,
scores,
score_threshold,
nms_threshold,
eta=None,
top_k=None)
dnn.readNet 作用:加载深度学习网络及其模型参数
readNet(model, config=None, framework=None)
对应于特定框架的API:
Caffe
readNetFromCaffe(prototxt, caffeModel=None)
作用:加载采用Caffe的配置网络和训练的权重参数 Darknet
readNetFromDarknet(cfgFile, darknetModel=None)
作用:加载采用Darknet的配置网络和训练的权重参数
Tensorflow
readNetFromTensorflow(model, config=None)
作用:加载采用Tensorflow 的配置网络和训练的权重参数
参数:
- model: .pb 文件
- config: .pbtxt 文件
Torch
readNetFromTorch(model, isBinary=None)
作用:加载采用 Torch 的配置网络和训练的权重参数
参数:
- model: 采用 torch.save()函数保存的文件
ONNX
readNetFromONNX(onnxFile)
作用:加载 .onnx 模型网络配置参数和权重参数
车流量检测
多目标追踪
多目标跟踪分类
多目标跟踪,即MOT(Multi-Object Tracking),也就是在一段视频中同时跟踪多个目标。MOT主要应用在安防监控和自动驾驶等领域中。
- 初始化方法
多目标跟踪问题中并不是所有目标都会在第一帧出现,也并不是所有目标都会出现在每一帧。那如何对出现的目标进行初始化,可以作为跟踪算法的分类表针。常见的初始化方法分为两大类,一个是Detection-Based-Tracking(DBT),一个是Detection-Free-Tracking(DFT)。下图比较形象地说明了两类算法的区别。 - 处理模式
MOT也存在着不同的处理模式,Online和Offline两大类,其主要区别在于是否用到了后续帧的信息。下图形象解释了Online与Offline跟踪的区别。
运动模型
为了简化多目标跟踪的难度,我们引入运动模型类简化求解过程,运动模型捕捉目标的动态行为,它估计目标在未来帧中的潜在位置,从而减少搜索空间。在大多数情况下,假设目标在现实中是平缓运动的,那么在图像空间也是如此。对于车辆的运动,大致可分为线性和非线性两种运动:
- 线性运动:线性运动模型是目前最主流的模型,即假设目标的运动属性平稳(速度,加速度,位置);
- 非线性运动:虽然线性运动模型比较常用,但由于存在它解决不了的问题,非线性运动模型随之诞生。它可以使tracklets间运动相似度计算得更加准确。
跟踪方法
多目标跟踪中基于神经网络的算法,端到端的算法并不多,主要还在实验室的刷榜阶段,模型复杂,速度慢,追踪结果也不好,我们就不再介绍,主要给大家介绍以下两种主流的算法:
- 基于Kalman和KM算法的后端优化算法
该类算法能达到实时性,但依赖于检测算法效果要好,特征区分要好(输出最终结果的好坏依赖于较强的检测算法,而基于卡尔曼加匈牙利匹配的追踪算法作用在于能够输出检测目标的id,其次能保证追踪算法的实时性),这样追踪效果会好,id切换少。代表性的算法是SORT/DeepSORT。 SORT 是一种实用的多目标跟踪算法,引入了线性速度模型与卡尔曼滤波来进行位置预测,在无合适匹配检测框的情况下,使用运动模型来预测物体的位置。匈牙利算法是一种寻找二分图的最大匹配的算法,在多目标跟踪问题中可以简单理解为寻找前后两帧的若干目标的匹配最优解的一种算法。而卡尔曼滤波可以看作是一种运动模型,用来对目标的轨迹进行预测,并且使用确信度较高的跟踪结果进行预测结果的修正。
多目标追踪一般接在目标检测后。在工业界目标检测采用比较多的是yolo检测网络,单阶段式,速度快,精度不差,部署在NV的平台帧率可以达到30fps以上。所以要实现目标检测代码和多目标追踪代码集成的任务,需要先将两者的框架统一。先实现目标检测网络,检测的输出结果主要是将检测框的位置信息输入到多目标追踪算法中。
- 基于多线程的单目标跟踪的多目标跟踪算法
这类算法特点是跟踪效果会很好,因为其为每一类物体都单独分配了一个跟踪器。但该算法对目标尺度变化要求较大,参数调试需要合理,同时该算法极耗cpu资源,实时性不高,代表算法是利用KCF进行目标跟踪。
多目标追踪本质上是多个目标同时运动的问题,所以有提出将单目标跟踪器引入到多目标追踪的问题,为每一个目标分配一个跟踪器,然后间接地使用匹配算法来修正那些跟踪失败或新出现的目标。代表性的单目标跟踪算法为核相关滤波算法(KCF),在精度和速度上能够同时达到很高的水平,是当时单目标跟踪最优秀的算法之一,后来的很多单目标跟踪算法都是基于此做的改进。
实际应用过程中会为每个目标分配一个KCF跟踪器并采用多线程的方式来组织这些跟踪器。同时因为实际硬件条件的限制,不可能提供强大的计算力资源,会采用检测器与跟踪器交替进行的跟踪策略。由于检测的帧率不高,使得跟踪的维持效果出现滞后或框飘的现象较为严重,实用性不大。
辅助功能
IOU是交并比(Intersection-over-Union)是目标检测中使用的一个概念是产生的候选框(candidate bound)与原标记框(ground truth bound)的交叠率,即它们的交集与并集的比值。最理想情况是完全重叠,即比值为1。在多目标跟踪中,用来判别跟踪框和目标检测框之间的相似度。
计算交并比
oU是两个区域的交除以两个区域的并得出的结果
def iou(bb_test, bb_gt):
"""
在两个box间计算IOU
:param bb_test: box1 = [x1y1x2y2]
:param bb_gt: box2 = [x1y1x2y2]
:return: 交并比IOU
"""
xx1 = np.maximum(bb_test[0], bb_gt[0])
yy1 = np.maximum(bb_test[1], bb_gt[1])
xx2 = np.minimum(bb_test[2], bb_gt[2])
yy2 = np.minimum(bb_test[3], bb_gt[3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) + (bb_gt[2] - bb_gt[0]) * (
bb_gt[3] - bb_gt[1]) - wh)
return o
候选框的表示形式
def convert_bbox_to_z(bbox):
"""
将[x1,y1,x2,y2]形式的检测框转为滤波器的状态表示形式[x,y,s,r]。其中x,y是框的中心坐标,s是面积,尺度,r是宽高比
:param bbox: [x1,y1,x2,y2] 分别是左上角坐标和右下角坐标
:return: [ x, y, s, r ] 4行1列,其中x,y是box中心位置的坐标,s是面积,r是纵横比w/h
"""
w = bbox[2] - bbox[0]
h = bbox[3] - bbox[1]
x = bbox[0] + w / 2.
y = bbox[1] + h / 2.
s = w * h
r = w / float(h)
return np.array([x, y, s, r]).reshape((4, 1))
def convert_x_to_bbox(x, score=None):
"""
将[cx,cy,s,r]的目标框表示转为[x_min,y_min,x_max,y_max]的形式
:param x:[ x, y, s, r ],其中x,y是box中心位置的坐标,s是面积,r
:param score: 置信度
:return:[x1,y1,x2,y2],左上角坐标和右下角坐标
"""
w = np.sqrt(x[2] * x[3])
h = x[2] / w
if score is None:
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
else:
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))
卡尔曼滤波器
简介
卡尔曼滤波(Kalman)无论是在单目标还是多目标领域都是很常用的一种算法,我们将卡尔曼滤波看做一种运动模型,用来对目标的位置进行预测,并且利用预测结果对跟踪的目标进行修正,属于自动控制理论中的一种方法。
在对视频中的目标进行跟踪时,当目标运动速度较慢时,很容易将前后两帧的目标进行关联,如下图所示: 如果目标运动速度比较快,或者进行隔帧检测时,在后续帧中,目标A已运动到前一帧B所在的位置,这时再进行关联就会得到错误的结果,将A‘与B关联在一起。 那怎么才能避免这种出现关联误差呢?我们可以在进行目标关联之前,对目标在后续帧中出现的位置进行预测,然后与预测结果进行对比关联,如下图所示: 我们在对比关联之前,先预测出A和B在下一帧中的位置,然后再使用实际的检测位置与预测的位置进行对比关联,只要预测足够精确,几乎不会出现由于速度太快而存在的误差。
卡尔曼滤波就可以用来预测目标在后续帧中出现的位置,如下图所示,卡尔曼滤波器就可以根据前面五帧数据目标的位置,预测第6帧目标的位置。 卡尔曼滤波器最大的优点是采用递归的方法来解决线性滤波的问题,它只需要当前的测量值和前一个周期的预测值就能够进行状态估计。由于这种递归方法不需要大量的存储空间,每一步的计算量小,计算步骤清晰,非常适合计算机处理,因此卡尔曼滤波受到了普遍的欢迎,在各种领域具有广泛的应用前景。
原理
我们假设一个简单的场景,有一辆小车在行驶,它的速度是v,可以通过观测得到它的位置p,也就是说我们可以实时的观测小车的状态。
- 场景描述
虽然我们比较确定小车此时的状态,无论是计算还是检测都会存在一定的误差,所以我们只能认为当前状态是其真实状态的一个最优估计。那么我们不妨认为当前状态服从一个高斯分布,如下图所示:
卡尔曼滤波器实践
filterpy
FilterPy是一个实现了各种滤波器的Python模块,它实现著名的卡尔曼滤波和粒子滤波器。我们可以直接调用该库完成卡尔曼滤波器实现。其中的主要模块包括: 开源代码
- 初始化
预先设定状态变量dim_x和观测变量维度dim_z、协方差矩阵P、运动形式和观测矩阵H等,一般各个协方差矩阵都会初始化为单位矩阵,根据特定的场景需要相应的设置。
def __init__(self, dim_x, dim_z, dim_u = 0, x = None, P = None,
Q = None, B = None, F = None, H = None, R = None):
"""Kalman Filter
Refer to http:/github.com/rlabbe/filterpy
Method
-----------------------------------------
Predict | Update
-----------------------------------------
| K = PH^T(HPH^T + R)^-1
x = Fx + Bu | y = z - Hx
P = FPF^T + Q | x = x + Ky
| P = (1 - KH)P
| S = HPH^T + R
-----------------------------------------
note: In update unit, here is a more numerically stable way: P = (I-KH)P(I-KH)' + KRK'
Parameters
----------
dim_x: int
dims of state variables, eg:(x,y,vx,vy)->4
dim_z: int
dims of observation variables, eg:(x,y)->2
dim_u: int
dims of control variables,eg: a->1
p = p + vt + 0.5at^2
v = v + at
=>[p;v] = [1,t;0,1][p;v] + [0.5t^2;t]a
"""
assert dim_x >= 1, 'dim_x must be 1 or greater'
assert dim_z >= 1, 'dim_z must be 1 or greater'
assert dim_u >= 0, 'dim_u must be 0 or greater'
self.dim_x = dim_x
self.dim_z = dim_z
self.dim_u = dim_u
self.x = np.zeros((dim_x, 1)) if x is None else x
self.P = np.eye(dim_x) if P is None else P
self.Q = np.eye(dim_x) if Q is None else Q
self.B = None if B is None else B
self.F = np.eye(dim_x) if F is None else F
self.H = np.zeros((dim_z, dim_x)) if H is None else H
self.R = np.eye(dim_z) if R is None else R
self._alpha_sq = 1.
self.z = np.array([[None] * self.dim_z]).T
self.K = np.zeros((dim_x, dim_z))
self.y = np.zeros((dim_z, 1))
self.S = np.zeros((dim_z, dim_z))
self.SI = np.zeros((dim_z, dim_z))
self.inv = np.linalg.inv
self._mahalanobis = None
self.latest_state = 'init'
- 预测阶段
接下来进入预测环节,为了保证通用性,引入了遗忘系数α,其作用在于调节对过往信息的依赖程度,α越大对历史信息的依赖越小:
def predict(self, u = None, B = None, F = None, Q = None):
"""
Predict next state (prior) using the Kalman filter state propagation equations:
x = Fx + Bu
P = fading_memory*FPF^T + Q
Parameters
----------
u : ndarray
Optional control vector. If not `None`, it is multiplied by B
to create the control input into the system.
B : ndarray of (dim_x, dim_z), or None
Optional control transition matrix; a value of None
will cause the filter to use `self.B`.
F : ndarray of (dim_x, dim_x), or None
Optional state transition matrix; a value of None
will cause the filter to use `self.F`.
Q : ndarray of (dim_x, dim_x), scalar, or None
Optional process noise matrix; a value of None will cause the
filter to use `self.Q`.
"""
if B is None:
B = self.B
if F is None:
F = self.F
if Q is None:
Q = self.Q
elif np.isscalar(Q):
Q = np.eye(self.dim_x) * Q
if B is not None and u is not None:
self.x = F @ self.x + B @ u
else:
self.x = F @ self.x
self.P = self._alpha_sq * (F @ self.P @ F.T) + Q
self.latest_state = 'predict'
def update(self, z, R = None, H = None):
"""
Update Process, add a new measurement (z) to the Kalman filter.
K = PH^T(HPH^T + R)^-1
y = z - Hx
x = x + Ky
P = (1 - KH)P or P = (I-KH)P(I-KH)' + KRK'
If z is None, nothing is computed.
Parameters
----------
z : (dim_z, 1): array_like
measurement for this update. z can be a scalar if dim_z is 1,
otherwise it must be convertible to a column vector.
R : ndarray, scalar, or None
Optionally provide R to override the measurement noise for this
one call, otherwise self.R will be used.
H : ndarray, or None
Optionally provide H to override the measurement function for this
one call, otherwise self.H will be used.
"""
if z is None:
self.z = np.array([[None] * self.dim_z]).T
self.y = np.zeros((self.dim_z, 1))
return
z = reshape_z(z, self.dim_z, self.x.ndim)
if R is None:
R = self.R
elif np.isscalar(R):
R = np.eye(self.dim_z) * R
if H is None:
H = self.H
if self.latest_state == 'predict':
PHT = self.P @ H.T
self.S = H @ PHT + R
self.SI = self.inv(self.S)
self.K = PHT @ self.SI
I_KH = np.eye(self.dim_x) - self.K @ H
self.P = I_KH @ self.P @ I_KH.T + self.K @ R @ self.K.T
self.y = z - H @ self.x
self._mahalanobis = math.sqrt(float(self.y.T @ self.SI @ self.y))
self.x = self.x + self.K @ self.y
self.latest_state = 'update'
小车案例
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
from filterpy.kalman import KalmanFilter
- 小车运动数据生成
在这里我们假设小车作速度为1的匀速运动
z = np.linspace(1,1000,1000)
mu,sigma = 0,1
noise = np.random.normal(mu,sigma,1000)
z_nosie = z+noise
my_filter = KalmanFilter(dim_x=2, dim_z=1)
my_filter.x = np.array([[0.], [0.]])
my_filter.P = np.array([[1., 0.], [0., 1.]])
my_filter.F = np.array([[1., 1.], [0., 1.]])
my_filter.Q = np.array([[0.0001, 0.], [0., 0.0001]])
my_filter.H = np.array([[1, 0]])
my_filter.R = 1
z_new_list = []
v_new_list = []
for k in range(len(z_nosie)):
my_filter.predict()
my_filter.update(z_nosie[k])
x = my_filter.x
z_new_list.append(x[0][0])
v_new_list.append(x[1][0])
dif_list = []
for k in range(len(z)):
dif_list.append(z_new_list[k]-z[k])
v_dif_list = []
for k in range(len(z)):
v_dif_list.append(v_new_list[k]-1)
plt.figure(figsize=(20,9))
plt.subplot(1,2,1)
plt.xlim(-50,1050)
plt.ylim(-3.0,3.0)
plt.scatter(range(len(z)),dif_list,color ='b',label = "位置偏差")
plt.scatter(range(len(z)),v_dif_list,color ='y',label = "速度偏差")
plt.legend()
- 卡尔曼滤波器参数的变化
首先定义方法将卡尔曼滤波器的参数堆叠成一个矩阵,右下角补0,我们看一下参数的变化
def filter_comb(p, f, q, h, r):
a = np.hstack((p, f))
b = np.array([r, 0])
b = np.vstack([h, b])
b = np.hstack((q, b))
a = np.vstack((a, b))
return a
z_new_list = []
v_new_list = []
for k in range(1):
my_filter.predict()
my_filter.update(z_nosie[k])
x = my_filter.x
c = filter_comb(my_filter.P,my_filter.F,my_filter.Q,my_filter.H,my_filter.R)
plt.figure(figsize=(32,18))
sns.set(font_scale=4)
sns.heatmap(c,square=True,annot=True,xticklabels=False,yticklabels=False,cbar=False)
从图中可以看出变化的P,其他的参数F,Q,H,R为变换。另外状态变量x和卡尔曼系数K也是变化的
- 概率密度函数
为了验证卡尔曼滤波的结果优于测量的结果,绘制预测结果误差和测量误差的概率密度函数
z_noise_list_std = np.std(noise)
z_noise_list_avg = np.mean(noise)
z_filterd_list_std = np.std(dif_list)
import seaborn as sns
plt.figure(figsize=(16,9))
ax = sns.kdeplot(noise,shade=True,color="r",label="std=%.3f"%z_noise_list_std)
ax = sns.kdeplot(dif_list,shade=True,color="g",label="std=%.3f"%z_filterd_list_std)
可以看出卡尔曼滤波器预测的结果误差方差更小,优于测试的结果
目标估计模型-卡尔曼滤波
def __init__(self, bbox):
self.kf = KalmanFilter(dim_x=7, dim_z=4)
self.kf.F = np.array(
[[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]])
self.kf.H = np.array(
[[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]])
self.kf.R[2:, 2:] *= 10.
self.kf.P[4:, 4:] *= 1000.
self.kf.P *= 10.
self.kf.Q[-1, -1] *= 0.01
self.kf.Q[4:, 4:] *= 0.01
self.kf.x[:4] = convert_bbox_to_z(bbox)
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 0
self.hit_streak = 0
self.age = 0
def update(self, bbox):
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1
self.kf.update(convert_bbox_to_z(bbox))
- 进行目标框的预测
推进状态变量并返回预测的边界框结果
def predict(self):
if (self.kf.x[6] + self.kf.x[2]) <= 0:
self.kf.x[6] *= 0.0
self.kf.predict()
self.age += 1
if self.time_since_update > 0:
self.hit_streak = 0
self.time_since_update += 1
self.history.append(convert_x_to_bbox(self.kf.x))
return self.history[-1]
class KalmanBoxTracker(object):
count = 0
def __init__(self, bbox):
"""
初始化边界框和跟踪器
:param bbox:
"""
self.kf = KalmanFilter(dim_x=7, dim_z=4)
self.kf.F = np.array(
[[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]])
self.kf.H = np.array(
[[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]])
self.kf.R[2:, 2:] *= 10.
self.kf.P[4:, 4:] *= 1000.
self.kf.P *= 10.
self.kf.Q[-1, -1] *= 0.01
self.kf.Q[4:, 4:] *= 0.01
self.kf.x[:4] = convert_bbox_to_z(bbox)
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 0
self.hit_streak = 0
self.age = 0
def update(self, bbox):
"""
使用观察到的目标框更新状态向量。filterpy.kalman.KalmanFilter.update 会根据观测修改内部状态估计self.kf.x。
重置self.time_since_update,清空self.history。
:param bbox:目标框
:return:
"""
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1
self.kf.update(convert_bbox_to_z(bbox))
def predict(self):
"""
推进状态向量并返回预测的边界框估计。
将预测结果追加到self.history。由于 get_state 直接访问 self.kf.x,所以self.history没有用到
:return:
"""
if (self.kf.x[6] + self.kf.x[2]) <= 0:
self.kf.x[6] *= 0.0
self.kf.predict()
self.age += 1
if self.time_since_update > 0:
self.hit_streak = 0
self.time_since_update += 1
self.history.append(convert_x_to_bbox(self.kf.x))
return self.history[-1]
def get_state(self):
"""
返回当前边界框估计值
:return:
"""
return convert_x_to_bbox(self.kf.x)
匈牙利、KM算法
匈牙利算法(Hungarian Algorithm)与KM算法(Kuhn-Munkres Algorithm)是用来解决多目标跟踪中的数据关联问题,匈牙利算法与KM算法都是为了求解二分图的最大匹配问题。 有一种很特别的图,就做二分图,那什么是二分图呢?就是能分成两组,U,V。其中,U上的点不能相互连通,只能连去V中的点,同理,V中的点不能相互连通,只能连去U中的点。这样,就叫做二分图。
可以把二分图理解为视频中连续两帧中的所有检测框,第一帧所有检测框的集合称为U,第二帧所有检测框的集合称为V。同一帧的不同检测框不会为同一个目标,所以不需要互相关联,相邻两帧的检测框需要相互联通,最终将相邻两帧的检测框尽量完美地两两匹配起来。而求解这个问题的最优解就要用到匈牙利算法或者KM算法。
匈牙利算法
匈牙利算法是一种在多项式时间内求解任务分配问题的组合优化算法。美国数学家哈罗德·库恩于1955年提出该算法。此算法之所以被称作匈牙利算法,是因为算法很大一部分是基于以前匈牙利数学家Dénes K?nig和Jen? Egerváry的工作之上创建起来的。
我们以目标跟踪的方法介绍匈牙利算法,以下图为例,假设左边的四张图是我们在第N帧检测到的目标(U),右边四张图是我们在第N+1帧检测到的目标(V)。红线连起来的图,是我们的算法认为是同一行人可能性较大的目标。由于算法并不是绝对理想的,因此并不一定会保证每张图都有一对一的匹配,一对二甚至一对多,再甚至多对多的情况都时有发生。这时我们怎么获得最终的一对一跟踪结果呢?我们来看匈牙利算法是怎么做的。
- 第一步
首先给左1进行匹配,发现第一个与其相连的右1还未匹配,将其配对,连上一条蓝线。 - 第二步
接着匹配左2,发现与其相连的第一个目标右2还未匹配,将其配对 - 第三步
**接下来是左3,发现最优先的目标右1已经匹配完成了,怎么办呢?
我们给之前右1的匹配对象左1分配另一个对象。
(黄色表示这条边被临时拆掉)** **可以与左1匹配的第二个目标是右2,但右2也已经有了匹配对象,怎么办呢?
我们再给之前右2的匹配对象左2分配另一个对象(注意这个步骤和上面是一样的,这是一个递归的过程)。** **此时发现左2还能匹配右3,那么之前的问题迎刃而解了,回溯回去。
左2对右3,左1对右2,左3对右1。** 所以第三步最后的结果就是: 最后是左4,很遗憾,按照第三步的节奏我们没法给左4腾出来一个匹配对象,只能放弃对左4的匹配,匈牙利算法流程至此结束。蓝线就是我们最后的匹配结果。至此我们找到了这个二分图的一个最大匹配。最终的结果是我们匹配出了三对目标,由于候选的匹配目标中包含了许多错误的匹配红线(边),所以匹配准确率并不高。可见匈牙利算法对红线连接的准确率要求很高,也就是要求我们运动模型、外观模型等部件必须进行较为精准的预测,或者预设较高的阈值,只将置信度较高的边才送入匈牙利算法进行匹配,这样才能得到较好的结果。
匈牙利算法的流程大家看到了,有一个很明显的问题相信大家也发现了,按这个思路找到的最大匹配往往不是我们心中的最优。匈牙利算法将每个匹配对象的地位视为相同,在这个前提下求解最大匹配。这个和我们研究的多目标跟踪问题有些不合,因为每个匹配对象不可能是同等地位的,总有一个真实目标是我们要找的最佳匹配,而这个真实目标应该拥有更高的权重,在此基础上匹配的结果才能更贴近真实情况。
KM算法就能比较好地解决这个问题,我们下面来看看KM算法。
KM算法
KM算法解决的是带权二分图的最优匹配问题。
还是用上面的图来举例子,这次给每条连接关系加入了权重,也就是我们算法中其他模块给出的置信度分值。
- 第一步
首先对每个顶点赋值,称为顶标,将左边的顶点赋值为与其相连的边的最大权重,右边的顶点赋值为0。 - 第二步
匹配的原则是只和权重与左边分数(顶标)相同的边进行匹配,若找不到边匹配,对此条路径的所有左边顶点的顶标减d,所有右边顶点的顶标加d。参数d我们在这里取值为0.1。
对于左1,与顶标分值相同的边先标蓝。 然后是左2,与顶标分值相同的边标 然后是左3,发现与右1已经与左1配对。首先想到让左3更换匹配对象,然而根据匹配原则,只有权值大于等于0.9+0=0.9(左顶标加右顶标)的边能满足要求。于是左3无法换边。那左1能不能换边呢?对于左1来说,只有权值大于等于0.8+0=0.8的边能满足要求,无法换边。此时根据KM算法,应对所有冲突的边的顶点做加减操作,令左边顶点值减0.1,右边顶点值加0.1。结果如下图所示。 再进行匹配操作,发现左3多了一条可匹配的边,因为此时左3对右2的匹配要求只需权重大于等于0.8+0即可,所以左3与右2匹配 最后进行左4的匹配,由于左4唯一的匹配对象右3已被左2匹配,发生冲突。进行一轮加减d操作,再匹配,左四还是匹配失败。两轮以后左4期望值降为0,放弃匹配左4。
至此KM算法流程结束,三对目标成功匹配,甚至在左三目标预测不够准确的情况下也进行了正确匹配。可见在引入了权重之后,匹配成功率大大提高。。
最后还有一点值得注意,匈牙利算法得到的最大匹配并不是唯一的,预设匹配边、或者匹配顺序不同等,都可能会导致有多种最大匹配情况,所以有一种替代KM算法的想法是,我们只需要用匈牙利算法找到所有的最大匹配,比较每个最大匹配的权重,再选出最大权重的最优匹配即可得到更贴近真实情况的匹配结果。但这种方法时间复杂度较高,会随着目标数越来越多,消耗的时间大大增加,实际使用中并不推荐。
匈牙利算法的实现
scipy.optimize.linear_sum_assignment(cost_matrix)
实现KM算法,其中const_matrix表示代价矩阵。比如第一帧有a,b,c,d四个目标,第二帧图像有p,q,r,s四个目标,其相似度矩阵如下所示:
from scipy.optimize import linear_sum_assignment
import numpy as np
cost =np.array([[0.9,0.6,0,0],[0,0.3,0.9,0],[0.5,0.9,0,0],[0,0,0.2,0]])
row_ind,col_ind=linear_sum_assignment(-cost)
print("行索引:\n{}".format(row_ind))
print("列索引:\n{}".format(col_ind))
print("匹配度:\n{}".format(cost[row_ind,col_ind]))
数据关联
在这里我们对检测框和跟踪框进行匹配,整个流程是遍历检测框和跟踪框,并进行匹配,匹配成功的将其保留,未成功的将其删除。
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
"""
将检测框bbox与卡尔曼滤波器的跟踪框进行关联匹配
:param detections:检测框
:param trackers:跟踪框,即跟踪目标
:param iou_threshold:IOU阈值
:return:跟踪成功目标的矩阵:matchs
新增目标的矩阵:unmatched_detections
跟踪失败即离开画面的目标矩阵:unmatched_trackers
"""
if (len(trackers) == 0) or (len(detections) == 0):
return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)
iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
for d, det in enumerate(detections):
for t, trk in enumerate(trackers):
iou_matrix[d, t] = iou(det, trk)
result = linear_sum_assignment(-iou_matrix)
matched_indices = np.array(list(zip(*result)))
unmatched_detections = []
for d, det in enumerate(detections):
if d not in matched_indices[:, 0]:
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if t not in matched_indices[:, 1]:
unmatched_trackers.append(t)
matches = []
for m in matched_indices:
if iou_matrix[m[0], m[1]] < iou_threshold:
unmatched_detections.append(m[0])
unmatched_trackers.append(m[1])
else:
matches.append(m.reshape(1, 2))
if len(matches) == 0:
matches = np.empty((0, 2), dtype=int)
else:
matches = np.concatenate(matches, axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
SORT/deepSORT
SORT和DeepSORT是多目标跟踪中两个知名度比较高的算法。DeepSORT是原团队对SORT的改进版本。
SORT
SORT核心是卡尔曼滤波和匈牙利匹配两个算法。流程图如下所示,可以看到整体可以拆分为两个部分,分别是匹配过程和卡尔曼预测加更新过程,都用灰色框标出来了。 关键步骤:轨迹卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配(IOU匹配) → 卡尔曼滤波更新
卡尔曼滤波分为两个过程:预测和更新。SORT引入了线性速度模型与卡尔曼滤波来进行位置预测,先进行位置预测然后再进行匹配。运动模型的结果可以用来预测物体的位置。
匈牙利算法解决的是一个分配问题,用IOU距离作为权重(也叫cost矩阵),并且当IOU小于一定数值时,不认为是同一个目标,理论基础是视频中两帧之间物体移动不会过多。在代码中选取的阈值是0.3。scipy库的linear_sum_assignment都实现了这一算法,只需要输入cost_matrix即代价矩阵就能得到最优匹配。
DeepSort
DeepSORT是SORT的续作,整体框架没有大改,还是延续了卡尔曼滤波加匈牙利算法的思路,在这个基础上增加了鉴别网络Deep Association Metric。
下图是deepSORT流程图,和SORT基本一样,就多了级联匹配(Matching Cascade)和新轨迹的确认(confirmed)。 关键步骤:轨迹卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配(级联匹配和IOU匹配) → 卡尔曼滤波更新
级联匹配流程图如下所示: 其中上半部分就是相似度估计,也就是算这个分配问题的代价函数。下半部分依旧使用匈牙利算法进行检测框和跟踪框的匹配。
多目标追踪
在这里我们主要实现了一个多目标跟踪器,管理多个卡尔曼滤波器对象,主要包括以下内容:
初始化:最大检测数,目标未被检测的最大帧数 目标跟踪结果的更新,即跟踪成功和失败的目标的更新
def __init__(self, max_age=1, min_hits=3):
"""
初始化:设置SORT算法的关键参数
"""
self.max_age = max_age
self.min_hits = min_hits
self.trackers = []
self.frame_count = 0
- 目标跟踪结果的更新
该方法实现了SORT算法,输入是当前帧中所有物体的检测框的集合,包括目标的score,输出是当前帧标的跟踪框集合,包括目标的跟踪的id要求是即使检测框为空,也必须对每一帧调用此方法,返回一个类似的输出数组,最后一列是目标对像的id。需要注意的是,返回的目标对象数量可能与检测框的数量不同.
def update(self, dets):
self.frame_count += 1
trks = np.zeros((len(self.trackers), 5))
to_del = []
ret = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict()[0]
trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
if np.any(np.isnan(pos)):
to_del.append(t)
trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
for t in reversed(to_del):
self.trackers.pop(t)
matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks)
for t, trk in enumerate(self.trackers):
if t not in unmatched_trks:
d = matched[np.where(matched[:, 1] == t)[0], 0]
trk.update(dets[d, :][0])
for i in unmatched_dets:
trk = KalmanBoxTracker(dets[i, :])
self.trackers.append(trk)
i = len(self.trackers)
for trk in reversed(self.trackers):
d = trk.get_state()[0]
if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
ret.append(np.concatenate((d, [trk.id + 1])).reshape(1, -1))
i -= 1
if trk.time_since_update > self.max_age:
self.trackers.pop(i)
if len(ret) > 0:
return np.concatenate(ret)
return np.empty((0, 5))
yoloV3模型
yoloV3以V1,V2为基础进行的改进,主要有:利用多尺度特征进行目标检测;先验框更丰富;调整了网络结构;对象分类使用logistic代替了softmax,更适用于多标签分类任务。
算法简介
YOLOv3是YOLO (You Only Look Once)系列目标检测算法中的第三版,相比之前的算法,尤其是针对小目标,精度有显著提升。 yoloV3的流程如下图所示,对于每一幅输入图像,YOLOv3会预测三个不同尺度的输出,目的是检测出不同大小的目标。
多尺度检测
通常一幅图像包含各种不同的物体,并且有大有小。比较理想的是一次就可以将所有大小的物体同时检测出来。因此,网络必须具备能够“看到”不同大小的物体的能力。因为网络越深,特征图就会越小,所以网络越深小的物体也就越难检测出来。
在实际的feature map中,随着网络深度的加深,浅层的feature map中主要包含低级的信息(物体边缘,颜色,初级位置信息等),深层的feature map中包含高等信息(例如物体的语义信息:狗,猫,汽车等等)。因此在不同级别的feature map对应不同的scale,所以我们可以在不同级别的特征图中进行目标检测。如下图展示了多种scale变换的经典方法。 (a) 这种方法首先建立图像金字塔,不同尺度的金字塔图像被输入到对应的网络当中,用于不同scale物体的检测。但这样做的结果就是每个级别的金字塔都需要进行一次处理,速度很慢,在SPPNet使用的就是这种方式。
(b) 检测只在最后一层feature map阶段进行,这个结构无法检测不同大小的物体
? 对不同深度的feature map分别进行目标检测。SSD中采用的便是这样的结构。这样小的物体会在浅层的feature map中被检测出来,而大的物体会在深层的feature map被检测出来,从而达到对应不同scale的物体的目的,缺点是每一个feature map获得的信息仅来源于之前的层,之后的层的特征信息无法获取并加以利用。
(d) 与?很接近,但不同的是,当前层的feature map会对未来层的feature map进行上采样,并加以利用。因为有了这样一个结构,当前的feature map就可以获得“未来”层的信息,这样的话低阶特征与高阶特征就有机融合起来了,提升检测精度。在YOLOv3中,就是采用这种方式来实现目标多尺度的变换的。
网络模型结构
在基本的图像特征提取方面,YOLO3采用了Darknet-53的网络结构(含有53个卷积层),它借鉴了残差网络ResNet的做法,在层之间设置了shortcut,来解决深层网络梯度的问题,shortcut如下图所示:包含两个卷积层和一个shortcut connections。 yoloV3的模型结构如下所示 整个v3结构里面,没有池化层和全连接层,网络的下采样是通过设置卷积的stride为2来达到的,每当通过这个卷积层之后图像的尺寸就会减小到一半。残差模块中的1×,2×,8×,8× 等表示残差模块的个数。
先验框
yoloV3采用K-means聚类得到先验框的尺寸,为每种尺度设定3种先验框,总共聚类出9种尺寸的先验框。 在COCO数据集这9个先验框是:(10x13),(16x30),(33x23),(30x61),(62x45),(59x119),(116x90),(156x198),(373x326)。在最小的(13x13)特征图上(有最大的感受野)应用较大的先验框(116x90),(156x198),(373x326),适合检测较大的对象。中等的(26x26)特征图上(中等感受野)应用中等的先验框(30x61),(62x45),(59x119),适合检测中等大小的对象。较大的(52x52)特征图上(较小的感受野)应用,其中较小的先验框(10x13),(16x30),(33x23),适合检测较小的对象。
直观上感受9种先验框的尺寸,下图中蓝色框为聚类得到的先验框。黄色框式ground truth,红框是对象中心点所在的网格。
ligistic回归
预测对象类别时不使用softmax,而是被替换为一个1x1的卷积层+logistic激活函数的结构。使用softmax层的时候其实已经假设每个输出仅对应某一个单个的class,但是在某些class存在重叠情况(例如woman和person)的数据集中,使用softmax就不能使网络对数据进行很好的预测。
yoloV3模型的输入与输出
YoloV3的输入输出形式如下图所示 输入416×416×3的图像,通过darknet网络得到三种不同尺度的预测结果,每个尺度都对应N个通道,包含着预测的信息;
每个网格每个尺寸的anchors的预测结果。
YOLOv3共有13×13×3 + 26×26×3 + 52×52×3个预测 。每个预测对应85维,分别是4(坐标值)、1(置信度分数)、80(coco类别概率)。
基于yoloV3的目标检测
在这里我们进行的目标检测是基于OPenCV的利用yoloV3进行目标检测,不涉及yoloV3的模型结构、理论及训练过程,只是利用训练好的模型进行目标检测
labelsPath = "./yolo-coco/coco.names"
LABELS = open(labelsPath).read().strip().split("\n")
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(200, 3),dtype="uint8")
weightsPath = "./yolo-coco/yolov3.weights"
configPath = "./yolo-coco/yolov3.cfg"
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
frame = cv2.imread("./images/car1.jpg")
(W, H) = (None, None)
if W is None or H is None:
(H, W) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
net.setInput(blob)
start = time.time()
layerOutputs = net.forward(ln)
boxes = []
confidences = []
classIDs = []
for output in layerOutputs:
for detection in output:
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
if confidence > 0.3:
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
idxs = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.3)
- 最终检测结果,绘制,并存入到ndarray中,供目标追踪使用
dets = []
if len(idxs) > 0:
for i in idxs.flatten():
if LABELS[classIDs[i]] == "car":
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
cv2.rectangle(frame, (x, y), (x+w, y+h), (0,255,0), 2)
dets.append([x, y, x + w, y + h, confidences[i]])
np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})
dets = np.asarray(dets)
plt.imshow(frame[:,:,::-1])
labelsPath = "./yolo-coco/coco.names"
LABELS = open(labelsPath).read().strip().split("\n")
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(200, 3),dtype="uint8")
weightsPath = "./yolo-coco/yolov3.weights"
configPath = "./yolo-coco/yolov3.cfg"
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
"""
视频处理类
"""
vs = cv2.VideoCapture('./input/test_1.mp4')
(W, H) = (None, None)
writer = None
try:
prop = cv2.cv.CV_CAP_PROP_FRAME_COUNT if imutils.is_cv2() \
else cv2.CAP_PROP_FRAME_COUNT
total = int(vs.get(prop))
print("[INFO] {} total frames in video".format(total))
except:
print("[INFO] could not determine # of frames in video")
print("[INFO] no approx. completion time can be provided")
total = -1
while True:
(grabbed, frame) = vs.read()
if not grabbed:
break
if W is None or H is None:
(H, W) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
net.setInput(blob)
start = time.time()
layerOutputs = net.forward(ln)
"""
输出layerOutsputs介绍:
是YOLO算法在图片中检测到的bbx的信息
由于YOLO v3有三个输出,也就是上面提到的['yolo_82', 'yolo_94', 'yolo_106']
因此layerOutsputs是一个长度为3的列表
其中,列表中每一个元素的维度是(num_detection, 85)
num_detections表示该层输出检测到bbx的个数
85:因为该模型在COCO数据集上训练,[5:]表示类别概率;[0:4]表示bbx的位置信息;[5]表示置信度
"""
end = time.time()
"""
下面对网络输出的bbx进行检查:
判定每一个bbx的置信度是否足够的高,以及执行NMS算法去除冗余的bbx
"""
boxes = []
confidences = []
classIDs = []
for output in layerOutputs:
for detection in output:
scores = detection[5:]
"""
# scores = detection[5:] ---> [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0. 0.]
# scores的大小应该是1*80,因为在训练yolo模型时是80类目标
"""
classID = np.argmax(scores)
confidence = scores[classID]
if confidence > 0.3:
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
idxs = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.3)
dets = []
if len(idxs) > 0:
for i in idxs.flatten():
if LABELS[classIDs[i]] == "car":
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
dets.append([x, y, x + w, y + h, confidences[i]])
np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})
dets = np.asarray(dets)
车流量统计
基于虚拟线圈法的车辆统计
基于虚拟线圈的车流量统计算法原理与交通道路上的常见的传统的物理线圈类似,由于物理线圈需要埋设在路面之下,因此会有安装、维护费用高,造成路面破坏等问题,而采用基于视频的虚拟线圈的车辆计数方法完全避免了以上问题,且可以针对多个感兴趣区域进行检测。
虚拟线圈车辆计数法的原理是在采集到的交通流视频中,在需要进行车辆计数的道路或路段上设置一条或一条以上的检测线对通过车辆进行检测,从而完成计数工作。检测线的设置原则一般是在检测车道上设置一条垂直于车道线,居中的虚拟线段,通过判断其与通过车辆的相对位置的变化,完成车流量统计的工作。如下图所示,绿色的线就是虚拟检测线: 在该项目中我们进行检测的方法是,计算前后两帧图像的车辆检测框的中心点连线,若该连线与检测线相交,则计数加一,否则计数不变。
那怎么判断两条线段是否相交呢?
假设有两条线段AB,CD,若AB,CD相交,我们可以确定:
1.线段AB与CD所在的直线相交,即点A和点B分别在直线CD的两边;
2.线段CD与AB所在的直线相交,即点C和点D分别在直线AB的两边;
上面两个条件同时满足是两线段相交的充要条件,所以我们只需要证明点A和点B分别在直线CD的两边,点C和点D分别在直线AB的两边,这样便可以证明线段AB与CD相交了。 在上图中,线段AB与线段CD相交,于是我们可以得到两个向量AC,AD,C和D分别在AB的两边,向量AC在向量AB的逆时针方向,AB×AC > 0;向量AD在向量AB的顺时针方向,AB×AD < 0,两叉乘结果异号。
这样,方法就出来了:如果线段CD的两个端点C和D,与另一条线段的一个端点(A或B,只能是其中一个)连成的向量,与向量AB做叉乘,若结果异号,表示C和D分别在直线AB的两边,若结果同号,则表示CD两点都在AB的一边,则肯定不相交。
所以我们利用叉乘的方法来判断车辆是否经过检测线。
实现
def intersect(A, B, C, D):
return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D)
def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])
for box in boxes:
(x, y) = (int(box[0]), int(box[1]))
(w, h) = (int(box[2]), int(box[3]))
color = [int(c) for c in COLORS[indexIDs[i] % len(COLORS)]]
cv2.rectangle(frame, (x, y), (w, h), color, 2)
"""
根据当前帧的检测结果,与上一帧检测的检测结过,进行虚拟线圈完成车辆计数:车流量统计
"""
if indexIDs[i] in previous:
previous_box = previous[indexIDs[i]]
(x2, y2) = (int(previous_box[0]), int(previous_box[1]))
(w2, h2) = (int(previous_box[2]), int(previous_box[3]))
p0 = (int(x + (w - x) / 2), int(y + (h - y) / 2))
p1 = (int(x2 + (w2 - x2) / 2), int(y2 + (h2 - y2) / 2))
cv2.line(frame, p0, p1, color, 3)
"""
进行碰撞检测-前后两帧检测框中心点的连线穿过基准线,则进行计数
"""
if intersect(p0, p1, line[0], line[1]):
counter += 1
if y2 < y:
counter_down += 1
else:
counter_up += 1
视频中的车流量统计
前面我们已经完成了视频中车辆的检测功能,下面我们对车辆进行跟踪,并将跟踪结果绘制在视频中
if np.size(dets) == 0:
continue
else:
tracks = tracker.update(dets)
boxes = []
indexIDs = []
previous = memory.copy()
memory = {}
for track in tracks:
boxes.append([track[0], track[1], track[2], track[3]])
indexIDs.append(int(track[4]))
memory[indexIDs[-1]] = boxes[-1]
cv2.line(frame, line[0], line[1], (0, 255, 0), 3)
cv2.putText(frame, str(counter), (30, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (255, 0, 0), 3)
cv2.putText(frame, str(counter_up), (130, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (0, 255, 0), 3)
cv2.putText(frame, str(counter_down), (230, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (0, 0, 255), 3)
if writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter("./output/output.mp4",
fourcc,
30,
(frame.shape[1], frame.shape[0]),
True)
writer.write(frame)
cv2.imshow("", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
|