基于多分类线性SVM&mediapipe手势关键点实现简易人机猜拳游戏
在【机器学习算法】支持向量机入门教程及相关数学推导 这篇博客中,我们已经完整的推导了SVM算法以及SMO序列最小化优化算法的来龙去脉。在本篇博客中,我们将基于这些理论基础,手写代码实现SVM算法的完整实现过程。并基于分类结果进行数据+决策面的可视化;同时,填一下上上上上上篇博客挖的坑,实现一个有趣的小应用:上次是基于逻辑斯蒂回归,这次我们用SVM算法来进行手势识别。
本次实验的所有代码已上传至github:https://github.com/Scienthusiasts/Machine-Learning 先贴一张实现效果:
基于SMO优化的SVM分类算法完整实现版本
本次手写代码的主要理论基础在【机器学习算法】支持向量机入门教程及相关数学推导中都有比较详尽的解释,推荐这两篇博客可以对照着看,相信对大家一定会有不一样的体会与理解。🎈
首先,我们构造一个类来实现SVM算法,类中包括算法的超参数,待优化的模型参数,以及模型用到的训练样本等:
class SVM:
def __init__(self, X, y, C, ε, ker):
self.X = X
self.y = y
self.C = C
self.ε = ε
self.m = X.shape[0]
self.α = np.zeros((self.m, 1))
self.b = 0
self.ker = ker
self.eCache = np.zeros((self.m, 2))
然后,SVM一个必不可少的通过样本的高维映射实现非线性划分的方法就是引入核函数。
核函数的种类有许多,在本例中我们采用最简单和常用的两种:线性核和高斯核
def K(self, xi, xj, param=0):
if param == 0:
xj = xj.reshape(-1, 1)
return np.dot(xi, xj)
if param == 1:
σ = 1.3
deltaRow = xi - xj
ker = np.exp(-np.dot(deltaRow, deltaRow.T) / (2 * σ * σ))
return ker
在SMO优化过程中,为了更新后的α2能用更新前的α2表示,而不是不够直观的ζ,我们需要用到标签与模型预测结果之间的差值E。同时,启发式搜索αj也需要用到E。
因此我们需要定义一个方法来实现对E的求解:
def calcEk(self, k):
fk = np.sum([self.α[i] * self.y[i] * self.K(self.X[i,:], self.X[k,:], self.ker) for i in range(self.m)]) + self.b
Ek = fk.reshape(1) - self.y[k]
return Ek
由于我们定义了一个误差缓存eCache 来记录这些误差,因此顺便实现一个误差更新方法:
def updateEk(self, k):
Ek = self.calcEk(k)
self.eCache[k,0] = 1
self.eCache[k,1] = Ek
在SMO选取参数αj的过程中,使用的是基于误差E最小的启发式的方法,这里定义一个方法实现:
def selectJRand(self, i, m):
j = i
while(j==i):
j = int(random.uniform(0, m))
return j
def selectJ(self, i, Ei):
bestJ = -1
maxDeltaE = 0
Ej = 0
validE = np.nonzero(self.eCache[0,:])[0]
if len(validE) > 1:
for k in validE:
if k == i: continue
Ek = self.calcEk(k)
deltaE = abs(Ei - Ek)
if deltaE > maxDeltaE:
bestJ = k
maxDeltaE = deltaE
Ej = Ek
return bestJ, Ej
else:
j = self.selectJRand(i, self.m)
Ej = self.calcEk(j)
return j, Ej
在SMO求得αj的值后,由于约束条件的存在,我们还需要对违背约束条件的αj进行截断:
def clipAlpha(self, αj, H, L):
if αj > H:
αj = H
if αj < L:
αj = L
return αj
在定义了一些零零碎碎但是必不可少的方法后,接下来就是对基于一次迭代的SMO优化算法流程的完整实现(更新一对参数):
def innerL(self, i):
Ei = self.calcEk(i)
if ((self.y[i]*Ei < -self.ε) and (self.α[i] < self.C)) or ((self.y[i]*Ei > self.ε) and (self.α[i] > 0)):
j, Ej = self.selectJ(i, Ei)
old_αi, old_αj = self.α[i].copy(), self.α[j].copy()
K11 = self.K(self.X[i,:], self.X[i,:], self.ker)
K12 = self.K(self.X[i,:], self.X[j,:], self.ker)
K22 = self.K(self.X[j,:], self.X[j,:], self.ker)
if self.y[i] != self.y[j]:
L = max(0, self.α[j] - self.α[i])
H = min(self.C, self.C + self.α[j] - self.α[i])
else:
L = max(0, self.α[j] + self.α[i] - self.C)
H = min(self.C, self.α[j] + self.α[i])
if(L == H): print("出现 L = H = %f, i = %d, j = %d, αj=%f" %(L, i, j, self.α[j])); return 0
η = -2.0 * K12 + K11 + K22
if η <= 0: print("η <= 0"); return 0
self.α[j] += self.y[j] * (Ei - Ej) / η
self.α[j] = self.clipAlpha(self.α[j], H, L)
self.updateEk(j)
if(np.abs(self.α[j] - old_αj) < 1e-5):
print("j 更新的太少")
return 0
self.α[i] += self.y[i] * self.y[j] * (old_αj - self.α[j])
D_αi, D_αj = (self.α[i] - old_αi), (self.α[j] - old_αj)
b1 = - Ei - self.y[i] * K11 * D_αi - self.y[j] * K12 * D_αj + self.b
b2 = - Ej - self.y[i] * K12 * D_αi - self.y[j] * K22 * D_αj + self.b
if 0 < self.α[i] < self.C: self.b = b1
elif 0 < self.α[j] < self.C: self.b = b2
else: self.b = (b1 + b2) / 2.0
return 1
else: return 0
启发式的寻找αi,将innerL(self, i) 写在迭代流中,并实时输出迭代更新的参数信息;模型收敛或达到最大迭代次数后退出。我们将这一过程封装成训练方法:
def train(self, maxIter):
iter = 0
entireSet = True
αPairsChanged = 0
while(iter < maxIter) and ((αPairsChanged > 0) or (entireSet)):
αPairsChanged = 0
if entireSet:
print("===========================全数据集遍历===========================")
for i in range(self.m):
αPairsChanged += self.innerL(i)
print("Iteration:%d | choosed i=%d | 已更新的参数对数:%d" % (iter, i, αPairsChanged))
iter += 1
else:
print("===========================非边界值遍历===========================")
nonBoundIs = np.nonzero((self.α>0) * (self.α<self.C))[0]
for i in nonBoundIs:
αPairsChanged += self.innerL(i)
print("Iteration:%d | choosed i=%d | 已更新的参数对数:%d" % (iter, i, αPairsChanged))
iter += 1
if entireSet:
entireSet = False
elif (αPairsChanged == 0):
entireSet = True
if(iter >= maxIter):print("超过最大迭代次数,算法结束")
if not((αPairsChanged < 0) or (entireSet)):print("算法收敛")
if(self.ker == 0):
self.W = np.zeros((self.X.shape[1], 1))
for i in range(self.m):
self.W += self.α[i] * self.y[i] * self.X[i,:].reshape(-1,1)
return self.W, self.b
return self.b, self.α
在模型训练完毕后,可以保存模型权重以便离线使用:
值得注意的是,能够使用参数W和b的模型仅限于线性SVM(或线性核),对于非线性SVM而言,由于核函数已经将样本X映射到了高维空间,因此我们只能基于参数α,训练集X, b进行模型的推理,而不能基于原始的样本特征。由于训练集X涉及的样本量可能很大,因此本篇博客中我们暂不考虑基于预训练非线性SVM的推理实现。
def save_weight_lin(self, path):
weight = np.concatenate((self.W, self.b.reshape(-1,1)), axis=0)
np.save(path, weight)
同样,模型的预测(评估验证集)也可以分为两种推理方式,基于W和b的线性SVM,基于α,训练集X, b的非线性SVM:
def eval(self, test_X):
res = []
sv_index = np.nonzero(self.α)[0]
for j in range(test_X.shape[0]):
res.append(self.b + np.sum([self.α[i] * self.y[i] * self.K(self.X[i,:], test_X[j,:], self.ker) for i in sv_index]))
return np.array(res)
@staticmethod
def linear_eval(test_X, W, b):
res = b + np.dot(test_X, W)
return res
SVM决策结果与数据集可视化
在书《机器学习实战》中,一共提供了两个例子供我们实验。一个好处是,这些数据是二维的,因此我们可以先可视化看看这些数据的分布特征:
显而易见,数据集1是线性可分的,数据集2是线性不可分的。对于线性可分的数据,采用线性SVM就可以完美的解决问题,对于那些线性不可分的数据,我们需要利用核映射来解决。
测试代码:
if __name__ == '__main__':
kernelType = 0
X, y = loadDataSet('./testSet.txt')
y = y.reshape(-1,1)
svm = SVM( X, y, C=0.6, ε=1e-4, ker=kernelType)
_, b, α = svm.train(maxIter=1)
print(α)
res = svm.eval(X)
acc = sum(res*y>0) / X.shape[0]
visualModel2D(X, y, b, α, kernelType)
可视化模型决策结果(数据集1,线性SVM):
图中圈起来的点代表支持向量,值得注意的是,基于软间隔SVM的支持向量不一定恰好都落在间隔边界上,这是因为软间隔SVM具有一定的容错力,也就是说,最大间隔可能嵌入在样本之中(总之模型肯定会帮助你找到一条最优直线,这条直线是容错率与模型泛化能力的tradeoff)
【但是说实话我还是不太理解为什么线性SVM的支持向量没有恰好在一条直线上,或许是和松弛变量有关。。🐶】
… …
Iteration:0 | choosed i=97 | 已更新的参数对数:4 Iteration:0 | choosed i=98 | 已更新的参数对数:4 Iteration:0 | choosed i=99 | 已更新的参数对数:4 超过最大迭代次数,算法结束 算法收敛 准确率: 1.000000 支持向量个数: 6
对于数据集2而言,可以使用高斯核来引入非线性SVM。
可视化模型决策结果(数据集2,非线性SVM):
Iteration:0 | choosed i=94 | 已更新的参数对数:26 Iteration:0 | choosed i=97 | 已更新的参数对数:27 Iteration:0 | choosed i=98 | 已更新的参数对数:27 出现 L = H = 0.000000, i = 99, j = 53, αj=0.000000 Iteration:0 | choosed i=99 | 已更新的参数对数:27 超过最大迭代次数,算法结束 算法收敛 准确率: 0.990000 支持向量个数: 37
一般而言,对于线性可分的数据,在其任意高维的某个特征空间中都一定是线性可分的,因此我们可以对数据集1引入高斯核:
可视化模型决策结果(数据集1,非线性SVM):
Iteration:0 | choosed i=98 | 已更新的参数对数:15 出现 L = H = 0.000000, i = 99, j = 85, αj=0.000000 Iteration:0 | choosed i=99 | 已更新的参数对数:15 超过最大迭代次数,算法结束 算法收敛 准确率: 0.990000 支持向量个数: 21
多分类SVM实战:基于mediapipe手势关键点实现简易版人机猜拳
在上一节机器学习实战中,我们利用逻辑斯蒂回归算法实现了二分类手势识别。在本节中,我们将核心算法改为线性SVM,实现手势三分类。具体的数据采集方法以及如何使用mediapipe工具包在上一次博客中已经有了详细的说明,感兴趣的小伙伴可以点击传送门:【机器学习实验四】基于Logistic Regression二分类算法实现手部姿态识别
二分类算法如何实现多分类
SVM和逻辑斯蒂回归算法有一个相同点,那就是它们都只能执行二分类任务,要实现对于两个以上类别的多分类,我们可以采取一些特殊的方法:
1. OVO(one versus one)
ovo即一对一的方法,我们可以将多个类别两两组合,假设有n个类别,两两组合就能产生n(n-1)/2个数据集,这时候我们需要对每对组合的数据集训练一个模型,总共训练n(n-1)/2个模型。在每次预测时,采用投票的方式选择预测类别最多的那个类别作为最终的预测结果。
基于ovo训练方法的缺点在于,一旦数据类别一多,需要训练的模型的个数将以平方级别增长,因此这种方法适用于小类别下的简单数据集。
下面这张图很直观的表达了ovo数据集划分方法:
2.OVR(one versus rest)
ovr即一对多方法,在数据划分时依次将某个类别的样本归为正例而其余的样本归为负例。假设有n个类别,我们就需要训练n个模型。在每次预测时,选择唯一分为正例的模型对应的类别作为最终预测结果。如果同时有多个模型预测出正例,则选取期望值最大的那个(对于SVM就是数值>0且越大越好)
基于ovr训练方法的缺点也比较明显,那就是正负例的样本很有可能分布不均衡,负例的样本数通常比较大。
下面这张图很直观的表达了ovr数据集划分方法:
基于OVO方法训练三分类SVM
在本次实战中,我们选择OVO方法进行数据划分,一共需要训练三个模型,即三组权重。
首先需要将三个类别的数据两两组合,分成三个数据集:
cloth_X = np.load("./cloth_dist.npy")
stone_X = np.load("./stone_dist.npy")
scissors_X = np.load("./scissors_dist.npy")
y = np.ones(cloth_X.shape[0])
X0 = np.concatenate((cloth_X,stone_X), axis=0).reshape(-1,21*21)
X1 = np.concatenate((cloth_X,scissors_X), axis=0).reshape(-1,21*21)
X2 = np.concatenate((stone_X,scissors_X), axis=0).reshape(-1,21*21)
y = np.concatenate((y,-y), axis=0).reshape(-1,1)
X012 = np.concatenate((X0,scissors_X.reshape(-1,21*21)), axis=0).reshape(-1,21*21)
y012 = np.concatenate((np.zeros(cloth_X.shape[0]),np.ones(cloth_X.shape[0])), axis=0)
y012 = np.concatenate((y012,np.ones(cloth_X.shape[0])*2), axis=0)
print(X012.shape, y012.shape)
(1500, 441) (1500,)
然后,对每个数据集训练一组权重:
kernelType = 0
Xs = {0:X0, 1:X1, 2:X2}
for i in Xs.keys():
svm = SVM( Xs[i], y, C=600, ε=1e-4, ker=kernelType)
W, b = svm.train(maxIter=1)
svm.save_weight_lin('./W&b_'+str(i)+'.npy')
验证精度:
res0 = svm.linear_eval(X012, W0, b0).reshape(-1)>0
res1 = svm.linear_eval(X012, W1, b1).reshape(-1)>0
res2 = svm.linear_eval(X012, W2, b2).reshape(-1)>0
pred = []
for i in range(len(res0)):
if(res0[i] and res1[i]):pred.append(0)
elif(not res0[i] and res2[i]):pred.append(1)
elif(not res1[i] and not res2[i]):pred.append(2)
else:pred.append(-1)
acc = sum(pred==y012) / X012.shape[0]
print("准确率: %f" % (acc))
准确率: 0.998000
测试(嵌入mediapipe手部关键点提取代码中)
如何实现人机交互式猜拳呢,哈哈其实很简单,只需要将模型识别手势的类别改为相克的属性即可🤣🤣
在代码首嵌入以下内容:
cls = {0:img2, 1:img0, 2:img1}
clstext1 = {0:"Paper", 1:"Rock", 2:"Scissors"}
clstext2 = {0:"Scissors", 1:"Paper", 2:"Rock"}
weight0 = np.load('./W&b_0.npy')
W0, b0 = weight0[:-1], weight0[-1]
weight1 = np.load('./W&b_1.npy')
W1, b1 = weight1[:-1], weight1[-1]
weight2 = np.load('./W&b_2.npy')
W2, b2 = weight2[:-1], weight2[-1]
def eval(X):
res0 = SVM.linear_eval(X, W0, b0).reshape(-1)>0
res1 = SVM.linear_eval(X, W1, b1).reshape(-1)>0
res2 = SVM.linear_eval(X, W2, b2).reshape(-1)>0
if(res0 and res1):pred = 0
elif(not res0 and res2):pred = 1
elif(not res1 and not res2):pred = 2
else:pred = -1
return pred
其中cls = {0:img2, 1:img0, 2:img1} 用来选择三张图片用来表示计算机方的选择(石头,剪刀,布):
在手部关键点提取代码下加上这几行代码即可:
... ...
from deal_hand_frame import distance
import sys;sys.path.append('../')
from logistic import logistic
... ...
X = distance(keypoint.reshape(1,21,3)).reshape(1,-1)
pred = eval(X)
w, h,_ = cls[pred].shape
image[:w, -h:, :] = cls[pred]
cv2.putText(image, 'your:' + clstext1[pred], (10,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(image, 'computer:' + clstext2[pred], (10,100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
... ...
测试效果展示(赢是不可能赢的):
在后续改进中,也可以针对两名人类玩家的对弈实现一个猜拳积分系统。
|