概述
在使用机器学习算法时,往往伴随着数据的前置处理,也就是我们常说的ETL数据清洗。在一般的分类算法中,输入数据的schema一般包含三个部分:特征列、标签列和其它列,其中特征列往往包含多个列。对于Spark mllib的API而言,基于rdd的API往往要求将若干个特征列构造成LabelPoint类,该类包含double类型的label和vector类型的features;基于dataframe的API要求传入并指定features列以及label列。接下来对多列特征聚合进行示例. LabelPoint的构造示例如下:
rdd.map(r => LabeledPoint(
r.getDouble(label),
Vectors.dense(features.map(r.getDouble(_)).toArray)
))
DataFrame API的标签、特征列设置示例如下:
train_df = new VectorAssembler()
.setInputCols(feature_col_name).setOutputCol("features")
其中“feature_col_name”是Array[String]类型,包含了DataFrame中特征列的列名。
Spark读取数据时,都是将数据以String类型整行读入,而String类型的数据输入往往无法满足算法所要求的的数据格式,因此需要对String类型的数据转化为向量类型。
数据转化
在读入数据时,往往是如下类型的数据格式:每一列以竖线分割,第一列为特征列,类型是稀疏向量,最后一列为标签列。该数据无法直接传入算法中进行处理,因为特征列是String类型,我们需要将该类型展开为密集向量类型,并传入算法中进行处理。
4:3 5:0.512 7:3|5|3|1
4:1 5:0.305 10:1 11:0.43|4|5|1
7:1 8:0.5|12|22|0
1:4 2:0.3 7:1 8:0.33 13:2|2|4|0
1:2 2:0.27 4:2 5:0.29 7:1|5|2|1
4:3 5:0.512 7:3|4|3|1
假设以上数据的schema为:
features - String
col1 - String
col2 - String
label - String
具体的处理代码如下:
val input: String = "xxx.txt"
val rawInput: Dataset[Row] = spark.read.option("sep", "|").csv(input)
val dataset = rawInput.select(
"label", "features")
.rdd
.map(l => (l._1, l._2.split(" ").map(_.split(":")).map(e => (e(0).toInt - 1, e(1).toDouble))))
.map(l => (l._1.toDouble, Vectors.sparse(3000, l._2.filter(_._2!=0).map(_._1), l._2.filter(_._2!=0).map(_._2))))
.map(l => (l._1, l._2.toDense))
.toDF("label", "features")
.show()
代码优化
在机器学习算法中,输入的数据集一般包含训练集、测试集和验证集,对于同一个算法而言,每一个数据集都需要进行相同的转化操作。上述向量转化代码过于冗长,复制多份会使得代码整体显得臃肿,后面发现可以使用udf的方式进行列转化的定义,使得该转化过程可以复用。
val stringToVector = udf((t: String) => {
val tmp_array = t.split(" ").map(_.split(":")).map(e => (e(0).toInt - 1, e(1).toDouble))
val tmp_vector = Vectors.sparse(max_col, tmp_array.filter(_._2 != 0).map(_._1), tmp_array.filter(_._2 != 0).map(_._2))
tmp_vector
})
train_df = train_df.withColumn("features", stringToVector(col(“feature”))
这种udf的方式可以用在DataFrame的各种数据转化上,包括数据内容的转化和数据类型的转化,能够使得我们更容易提炼出算法需要的数据格式。
|