1. 逻辑回归分类器
1.1 数据处理
只使用其中的两个特征,实现二分类
package com.zz.spark
import org.apache.spark.sql.{Row, SparkSession, functions}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{HashingTF, IndexToString, StringIndexer, Tokenizer, VectorIndexer}
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel}
object WordCount {
case class Iris(features: Vector, label: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("lr")
.master("local")
.getOrCreate()
import spark.implicits._
var inputPath = "/Users/zz/Desktop/fwmagic-spark-mllib/data/xgboost/sample.csv"
val data = spark.sparkContext.textFile(inputPath)
.map(_.split(","))
.map(p => Iris(
Vectors.dense(
p(0).toDouble,
p(1).toDouble,
p(2).toDouble,
p(3).toDouble
),
p(4).toString()
)).toDF()
data.show(false)
data.createTempView("iris")
val df = spark.sql("select * from iris where label != 'Iris-setosa'")
// 将标签放到前面
df.map(t => t(1) + ":" + t(0))
.collect().foreach(println)
// Iris-versicolor:[7.0,3.2,4.7,1.4]
// Iris-versicolor:[6.4,3.2,4.5,1.5]
}
}
?训练集测试集划分
val Array(trainData, testData) = df.randomSplit(Array(0.7, 0.3))
1.2 构建pipeline
1. 对标签列和特征列进行索引
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexerLabel")
.fit(df)
val featureIndex = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexerFeatures")
.fit(df)
2. 构建模型
val lr = new LogisticRegression()
.setLabelCol("indexerLabel")
.setFeaturesCol("indexerFeatures")
.setMaxIter(10)
.setRegParam(0.3) // 正则化参数
.setElasticNetParam(0.8)
3. 定义labelConverter,把预测的类别重新转化成字符型的
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
4. 构建pipeline
val lrPipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndex, lr, labelConverter))
val lrPipelineModel = lrPipeline.fit(trainData)
?1.3 模型预测
pipeline --> Estimator,pipelineModel --> Transformer
val lrPredictions = lrPipelineModel.transform(testData)
lrPredictions.show(false)
// 输出的值依次是:真实分类、特征值、预测属于不同分类地概率、预测的分类。
lrPredictions.select("label", "featues", "probability", "predictedLabel")
.collect()
.foreach(println)
1.4 模型评估
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexerLabel")
.setPredictionCol("prediction")
val accuracy = evaluator.evaluate(lrPredictions)
println("test error = " + (1.0-accuracy))
?test error = 0.38579067990832705
1.5 获取模型相关参数
val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
println(lrModel.coefficients)
// [-0.07036101525577013,0.0,0.0,0.08604194883092327]
println(lrModel.intercept)
// 0.10233333140234194
println(lrModel.numClasses)
// 2
println(lrModel.numFeatures)
// 4
2. 决策树分类器
?学习时利用训练数据,根据损失函数最小化的原则建立决策树模型。
2.1 DT学习的3个步骤
2.1.1 特征选择
?
?
2.1.2 决策树的生成
?
2.1.3 决策树的剪枝
?2.2 构建DT Pipeline
package com.zz.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.classification.{DecisionTreeClassifier,DecisionTreeClassificationModel}
object DT {
case class Iris(features: Vector, label: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("lr")
.master("local")
.getOrCreate()
import spark.implicits._
val inputPath = "/Users/zz/Desktop/fwmagic-spark-mllib/data/xgboost/sample.csv"
val data = spark.sparkContext.textFile(inputPath)
.map(_.split(","))
.map(p => Iris(
Vectors.dense(
p(0).toDouble,
p(1).toDouble,
p(2).toDouble,
p(3).toDouble
),
p(4).toString()
)).toDF()
data.createTempView("iris")
val df = spark.sql("select * from iris")
df.map(t => t(1) + ":" + t(0))
val Array(trainData, testData) = df.randomSplit(Array(0.7, 0.3))
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexerLabel")
.fit(df)
val featureIndex = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexerFeatures")
.setMaxCategories(4)
.fit(df)
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val dt = new DecisionTreeClassifier()
.setLabelCol("indexerLabel")
.setFeaturesCol("indexerFeatures")
val dtPipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndex, dt, labelConverter))
val dtPipelineModel = dtPipeline.fit(trainData)
val dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("label", "features", "predictedLabel")
.show(false)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexerLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(dtPredictions)
println("test error = " + (1.0-accuracy))
// test error = 0.020408163265306145
val dtModel = dtPipelineModel.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(dtModel.featureImportances)
// (4,[0,1,2,3],[0.008953900709219849,0.03880023640661938,0.8662341500107458,0.0860117128734151])
println(dtModel.rootNode)
// InternalNode(prediction = 0.0, impurity = 0.663464366238604, split = org.apache.spark.ml.tree.ContinuousSplit@999a0003)
println(dtModel.numClasses)
// 3
println(dtModel.numFeatures)
// 4
}
}
2.3 打印决策树
println(dtModel.toDebugString)
?
?
?
?
?
|