spark ml基于dataframe的线性回归
官网:https://spark.apache.org/docs/2.4.5/ml-guide.html
1、使用线性回归预测boston房价
package com.yyds.tags.ml.regression
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
object BostonRegression {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.config("spark.sql.shuffle.partitions",4)
.getOrCreate()
import spark.implicits._
val houseDS: Dataset[String] = spark.read
.textFile("datas/housing/housing.data")
.filter(line => null != line && line.trim.split("\\s+").length == 14)
val frame: DataFrame = houseDS.mapPartitions {
iter =>
iter.map {
line =>
val parts = line.trim.split("\\s+")
val label: Double = parts(parts.length - 1).toDouble
val features: linalg.Vector = Vectors.dense(parts.dropRight(1).map(_.toDouble))
(features, label)
}
}.toDF("features", "label")
frame.printSchema()
frame.show(10,truncate = false)
val Array(trainingDF,testingDF) = frame.randomSplit(Array(0.8,0.2))
trainingDF.persist(StorageLevel.MEMORY_AND_DISK).count()
val lr = new LinearRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setStandardization(true)
.setMaxIter(20)
.setSolver("auto")
val lrModel = lr.fit(trainingDF)
println("coefficients = " + lrModel.coefficients)
println("intercept = " + lrModel.intercept)
val trainingSummary = lrModel.summary
val rootMeanSquaredError = trainingSummary.rootMeanSquaredError
println("rootMeanSquaredError = " + rootMeanSquaredError)
lrModel.transform(testingDF).show(10,truncate = false)
Thread.sleep(10000000L)
spark.stop()
}
}
2、读取鸢尾花数据集数据,封装特征值features 和 标签处理label
package com.yyds.tags.ml.features
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructType}
object IrisFeaturesDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.config("spark.sql.shuffle.partitions",4)
.getOrCreate()
import spark.implicits._
val isrsSchema: StructType = new StructType()
.add("sepal_length",DoubleType,nullable = true)
.add("sepal_width",DoubleType,nullable = true)
.add("petal_length",DoubleType,nullable = true)
.add("petal_width",DoubleType,nullable = true)
.add("category",StringType, nullable = true)
val isrsDF: DataFrame = spark.read
.option("sep",",")
.option("header","false")
.option("inferSchema","false")
.schema(isrsSchema)
.csv("datas/iris/iris.data")
isrsDF.printSchema()
isrsDF.show(10,truncate = false)
val assembler: VectorAssembler = new VectorAssembler()
.setInputCols(isrsDF.columns.dropRight(1))
.setOutputCol("features")
val df = assembler.transform(isrsDF)
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("label")
val indexedDF: DataFrame = indexer
.fit(df)
.transform(df)
indexedDF.printSchema()
indexedDF.show(10,truncate = false)
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaled_features")
.setWithStd(true)
.setWithMean(false)
val scalerModel = scaler.fit(indexedDF)
val scaledDataDF: DataFrame = scalerModel.transform(indexedDF)
scaledDataDF.printSchema()
scaledDataDF.show(100,truncate = false)
val lr: LogisticRegression = new LogisticRegression()
.setFeaturesCol("scaled_features")
.setLabelCol("label")
.setMaxIter(20)
.setStandardization(false)
.setFamily("multinomial")
val lrModel = lr.fit(scaledDataDF)
println(s"多分类混淆矩阵: ${lrModel.coefficientMatrix}")
Thread.sleep(10000000L)
spark.stop()
}
}
|