密度峰值聚类(Density Peak Cluster,DPC)——Python实现
时间:2022/6/29
1.算法描述
关于密度峰值聚类的算法思想,大师兄的博客中已经基本介绍了机器学习之Density Peaks_因吉的博客-CSDN博客_density peaks。在这里我就简述一下。
对于样本空间而言,可以通过一定的距离度量来计算样本之间的距离。以当前样本为中心使用一个半径便可以计算样本周围的其他样本数量,便可衡量样本周围的密度。密度聚类算法便基于此。统计所有样本的密度,便可得到一个密度峰值图。
将数据样本按照某一方向映射到x轴上,y轴为以其为中心,以r为半径计算的密度。由上图可以看到,样本1和样本2均是峰值点,说明其周围的样本数在局部范围是最多的。按照聚类的思想:最大化类内相似度,最小化类间相似度,选取样本1和样本2作为聚类中心,便符合聚类的思想,可以在局部范围聚集最多的样本。虽然样本3的密度是大于样本1的,但是可以明显看出,样本3是隶属于样本2的,所以样本2是样本3的master(密度比当前样本大且距离最近的样本),故样本3在划分时并不能作为聚类中心,只能依附于样本2的簇。
故如何选取样本作为簇中心,便是密度峰值聚类算法的核心问题。从上图可以得到,一个样本具有密度和距离两种度量,密度便是该样本周围聚集的样本数量,体现着该样本的重要性。而距离则是该样本距离其master的距离,正所谓“天高皇帝远”,样本距离其master的距离越远,说明其独立性也就越高,如图中的样本1,其距离master样本2的距离很远,故它可以作为一个聚类中心,聚集成簇,而样本3聚类其master样本2的距离太近,被管的太严,不能成为距离中心。因此,样本的优先级则是其密度和距离的综合,这里简单的将二者相乘,得到其优先级:
p
r
i
o
r
i
t
y
=
d
e
n
s
i
t
y
?
d
i
s
t
a
n
c
e
(2)
priority=density*distance\tag{2}
priority=density?distance(2)
2.算法流程
1.计算数据集的距离矩阵
2.根据距离矩阵,计算每个样本的密度
3.根据距离矩阵和密度向量计算每个样本的master和当前样本到其master的距离
4.根据密度向量和样本到其master的距离计算样本的优先级
5.选取优先级最大的前k个样本作为聚类中心
6.给样本赋予簇号,样本没有簇号便使用其master的簇号作为自己的簇号
7.根据样本的簇号将数据集划分成k个簇
3.算法实现
import numpy as np
import pandas as pd
class DensityPeak:
"""
密度峰值聚类算法
"""
def __init__(self, distanceMatrix, dcRatio=0.2, clusterNumRatio=0.05, dcType="max", kernel="gaussian"):
'''
构造器,初始化相关参数
:param distanceMatrix: 数据集的距离矩阵
:param dcRatio: 半径比率 通常是0.2
:param dcType: 半径计算类型 包括‘max’,'ave','min' Hausdorff距离等
:param kernel: 密度计算时选取的计算函数 包括'cutoff-kernel' 'gaussian-kernel'
'''
self.distance_m = distanceMatrix
self.dcRatio_f = dcRatio
self.dcType = dcType
self.kernel = kernel
self.clusterCenterRatio_f = clusterNumRatio
self.densities_l = []
self.masters_l = []
self.distanceToMaster_l = []
self.representativeness_l = []
self.clusterCenter_l = []
self.numSample = 0
self.dc_f = 0
self.maxDistance = 0
self.label_l = []
self.clusters_d = {}
self.__initDensityPeak()
def __initDensityPeak(self):
'''
初始化
:return:
'''
self.numSample = len(self.distance_m)
self.maxDistance = self.getMaxDistance()
self.dc_f = self.getDc()
self.densities_l = self.computeDensities()
self.computeDistanceToMaster()
self.computePriority()
def getDc(self):
'''
计算半径dc
:return:
'''
resultDc = 0.0
match self.dcType:
case "max":
'''
计算最大Hausdorff距离
'''
resultDc = self.maxDistance
case "ave":
'''
平均Hausdorff距离
'''
resultDc = np.mean(self.distance_m)
case "min":
'''
最小Hausdorff距离
'''
resultDc = np.min(self.distance_m)
return resultDc * self.dcRatio_f
def getMaxDistance(self):
'''
计算实例间最大距离
:return:
'''
return np.max(self.distance_m)
def computeDensities(self):
'''
计算密度,按照给定的kernel进行计算
:return:
'''
if self.kernel == 'gaussian':
temp_local_density_list = np.sum(1 / (np.exp(np.power(self.distance_m / self.dc_f, 2))), axis=1)
return temp_local_density_list
elif self.kernel == 'cutoff':
temp_local_density_list = []
for i in range(0, self.numSample):
temp_local_density_list.append(self.cutoff_kernel(i))
return temp_local_density_list
def gaussian_kernel(self, i):
'''
高斯核计算密度
:param i: 实例标号
:return: 密度
'''
tempDensity = 0
for j in range(len(self.distance_m[i])):
tempDistance = self.distance_m[i][j]
tempDensity += np.exp(-(tempDistance / self.dc_f) ** 2)
return tempDensity
def cutoff_kernel(self, i):
'''
截断核计算密度
:param i: 实例标号
:return: 密度
'''
tempDensity = 0
for j in range(len(self.distance_m[i])):
tempDistance = self.distance_m[i][j]
tempDensity += self.F(tempDistance - self.dc_f)
return tempDensity
def F(self, x):
'''
截断核计算辅助函数
:param x: 距离差值
:return:
'''
if x < 0:
return 1
else:
return 0
def computeDistanceToMaster(self):
'''
计算实例到master的距离,同时确定实例的master
:return:
'''
tempSortDensityIndex = np.argsort(self.densities_l)[::-1]
self.distanceToMaster_l = np.zeros(self.numSample)
self.distanceToMaster_l[tempSortDensityIndex[0]] = float('inf')
self.masters_l = np.zeros(self.numSample, dtype=int)
self.masters_l[tempSortDensityIndex[0]] = 0
for i in range(1, self.numSample):
tempIndex = tempSortDensityIndex[i]
self.masters_l[tempIndex] = tempSortDensityIndex[
np.argmin(self.distance_m[tempIndex][tempSortDensityIndex[:i]])]
self.distanceToMaster_l[tempIndex] = np.min(self.distance_m[tempIndex][tempSortDensityIndex[:i]])
print(self.masters_l)
def computePriority(self):
'''
计算代表性(优先级)
:return:
'''
self.representativeness_l = np.multiply(self.densities_l, self.distanceToMaster_l)
def getLabel(self, i):
'''
获取实例的标签
:param i: 实例标号
:return: 实例聚类标签
'''
if self.label_l[i] < 0:
return self.label_l[i]
else:
return self.getLabel(self.masters_l[i])
def getClusterCenter(self):
n = int(self.numSample * self.clusterCenterRatio_f)
return np.argsort(self.representativeness_l)[-n:][::-1]
def cluster(self):
'''
按照比例计算聚类簇中心个数 进行聚类
:param clusterRatio: 簇中心占比
:return:
'''
n = int(self.numSample * self.clusterCenterRatio_f)
self.cluster(n=n)
def cluster(self, n=3):
'''
按照给定的簇中心个数进行聚类
:param n: 簇中心个数
:return:
'''
self.label_l = np.zeros(self.numSample, dtype=int)
self.clusterCenter_l = np.argsort(self.representativeness_l)[-n:][::-1]
for i in range(n):
self.label_l[self.clusterCenter_l[i]] = -i - 1
for i in range(self.numSample):
if self.label_l[i] < 0:
continue
self.label_l[i] = self.getLabel(self.masters_l[i])
self.clusters_d = {key: [] for key in self.label_l[self.clusterCenter_l]}
for i in self.label_l[self.clusterCenter_l]:
self.clusters_d[i] += [j for j in range(self.numSample) if self.label_l[j] == i]
@staticmethod
def getDistanceByEuclid(instance1, instance2):
'''
按照欧氏距离计算实例间距离
:param instance1: 实例1
:param instance2: 实例2
:return: 欧氏距离
'''
dist = 0
for key in range(len(instance1)):
dist += (float(instance1[key]) - float(instance2[key])) ** 2
return dist ** 0.5
if __name__ == '__main__':
dataset = pd.read_csv('dataset/iris.csv')
distanceMartix = []
data = dataset.to_numpy()[:, 1:5]
for i in range(len(data)):
tempdistances_l = [DensityPeak.getDistanceByEuclid(data[i], data[j]) for j in range(len(data))]
distanceMartix.append(tempdistances_l)
distanceMartix = np.array(distanceMartix)
dp = DensityPeak(distanceMartix, 0.2, "max")
dp.cluster()
for key, value in dp.clusters_d.items():
print("簇号=", key, ",clustet= ", value)
print("簇长度=", len(value))
4. 算法测试
使用iris数据集进行测试
|