MLlib采用Scala语言编写,借助了函数式编程设计思想,开发人员在开发的过程中只需要关注数据,而不需要关注算法本身,因为算法都已经集成在里面了,所以只需要传递参数和调试参数。 MLlib主要包含两部分,分别是底层基础和算法库。其中底层基础包括spark的运行库、矩阵库、和向量库,向量接口和矩阵接口是基于Netlib和BLAS/LAPACK开发的线性代数库Breeze;算法库包括分类、回归、聚类、协同过滤和特征提取等算法。
spark中的机器学习流程大致分3个阶段, 数据准备阶段 训练模型评估阶段 部署预测阶段
数据类型
MLlib的主要数据类型包括: 本地向量 标注点 本地矩阵 本地向量与本地矩阵是提供公共接口的简单数据模型 Breeze和Jblas提供了底层的线性代数运算
本地向量: 密集向量,稀疏向量 它们俩对应不同的向量表达方式 密集向量由double类型的纯数字组成 而稀疏向量则是一种更加精简,省空间的表达 如(1.2,0.0,3.0)是密集向量 而(3,[0,2],[1.2,3.0]),就是把0抽出来不写,3表示向量长度为3,[0,2],是向量中非0维度的索引值,即向量0和2的为非0元素,[1.2,3.0]是按照索引排列的数组元素值。
本地向量的基类是vector,MLlib提供了Dense vector 和sparse vector类 创建方式如下 导入包
import org.apache.spark.mllib.linalg.{Vector,Vectors}
密集向量
val dv:Vector=Vectors.dense(1,2,3)
稀疏向量
val sv1:Vector=Vectors.sparse(3,Array(0,2),Array(1,3))
另一种创建方式,注意,这里我踩了坑,
val sv2:Vector=Vectors.sparse(3,Seq((0,2),(2,3)))
<console>:24: error: overloaded method value sparse with alternatives:
(size: Int,elements: Iterable[(Integer, java.lang.Double)])org.apache.spark.mllib.linalg.Vector <and>
(size: Int,elements: Seq[(Int, scala.Double)])org.apache.spark.mllib.linalg.Vector
cannot be applied to (Int, Seq[(Int, Int)])
因此必须用double类型
val sv2:Vector=Vectors.sparse(3,Seq((0,2.0),(2,3.0)))
标注点 标注点是一种带有标签点本地向量,标注点通常用于监督学习算法中,如果是二分类,那正样本标签为1.0,负样本标签为0.0;对于多分类问题,标签是一个0开始的引用序列,如0,1,2… 标注点实现类是org.apache.spark.mllib.regression.LabeledPoint 导入包
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
创建
val pos=LabeledPoint(1.0,Vectors.dense(1.0,0,3))
val neg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))
本地矩阵
本地矩阵具有整型的行和列索引值以及Double类型的元素值,它存储在单个机器上。 密集矩阵:所有元素的值存储在一个列优先的双精度数组中,而稀疏矩阵则非0元素压缩到稀疏列格式中。 本地矩阵的基类是Matrix,DenseMatrix和SparseMatrix均是Matrix的继承类。
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
val dm:Matrix=Matrices.dense(3,2,Array(1.0,3,5,2.0,4,6))
val sm:Matrix=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))
spark MLlib基本统计
摘要统计
count()
mean()
variance()
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariatesStatisticalSummary,Statistics}
创建密集矩阵
val observations=sc.parallelize(
Seq(
Vectors.dense(1.0,10.0,100.0),
Vectors.dense(2.0,20.0,200.0),
Vectors.dense(3.0,30.0,300.0)
)
)
计算列摘要统计信息,调用statistics类的colStats()方法,可以获得RDD[Vector]类的摘要统计。colStats()方法返回一个实例MulitvariateStatisticalSummary对象,该对象包含了列的最大值、最小值、平均值、方差、非零元素的数量以及总数。
val summary:MultivariateStatisticalSummary=Statistics.colStats(observations)
打印结果
println(summary.mean)
println(summary.variance)
println(summary.numNonzeros)
相关统计
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
val seriesX: RDD[Double]=sc.parallelize(Array(1,2,3,3,5))
val seriesY: RDD[Double]=sc.parallelize(Array(11,22,33,33,555))
计算X,Y之间的相关系数
val correlation: Double=Statistics.corr(seriesX,seriesY,"pearson")
利用皮尔森方法计算密集矩阵相关系数
val data:RDD[Vector]=sc.parallelize(
| Seq(
| Vectors.dense(1.0,10.0,100),
| Vectors.dense(2,20,200),
| Vectors.dense(5.0,33,366))
| )
val correlMatrix:Matrix=Statistics.corr(data,"pearson")
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0 0.9788834658894731 0.9903895695275673
0.9788834658894731 1.0 0.9977483233986101
0.9903895695275673 0.9977483233986101 1.0
打印
println(correlMatrix.toString)
分层抽样 分层抽样是先将总体样本安装某种特征分为若干层,然后再从每一层内进行独立抽样,组成一个样本的统计学计算方法。 创建RDD
val data=sc.parallelize(
| Seq((1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')))
设定抽样格式
val fractions=Map(1->0.1,2->0.6,3->0.3)
从每层抽取样本
val approxSample=data.sampleByKey(withReplacement=false,fractions=fractions)
从每层抽取精确样本
val exactSample=data.sampleByKeyExact(withReplacement=false,fractions=fractions)
打印
approxSample.foreach(println)
exactSample.foreach(println)
分类
spark MLlib的两种线性分类方法 线性支持向量机 逻辑回归
线性支持向量机
import org.apache.spark.mllib.classification.{SVMModel,SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
加载spark官方提供数据集
val data=MLUtils.loadLibSVMFile(sc,"file:///export/servers/spark/data/mllib/sample_libsvm_data.txt")
将数据的60%分为训练数据,40%分为测试数据
val splits=data.randomSplit(Array(0.6,0.4),seed=11L)
val training=splits(0).cache()
val test=splits(1)
val numIterations=100
执行算法构建模型
val model = SVMWithSGD.train(training,numIterations)
用测试数据评估模型
val scoreAndLabels=test.map{point=> val score=model.predict(point.features)
(score,point.label)
)
获取评估指标
val metrics=new BianryClassificationMetrics(scoreAndLabels)
计算二元分类的PR和ROC曲线下的面积
val auROC=metrics.areaUnderROC()
保存并加载模型
model.save(sc,"target/tmp/scalaSVMWithSGDModel")
用load调用
val sameModel=SVMModel.load(sc,"target/tmp/scalaSVMWithSGDModel")
评估模型完成后,使用save()方法将模型保存至HDFS 目录下,下次使用可通过load()进行调用
|