Spark 从Vector提取值作为新列加入DataFrame
背景:spark 机器学习模型的输出概率值是一个vector(0的概率和1的概率),现在需要提取出vector的一个值(1的概率)作为新的一列,然后和预测前的字段一起输出到hive 问题:如果直接筛选vector字段输出到hive会报格式错误 解决方案:
- 选取需要输出的列名,并筛选出所需要的dataframe
- 从某列Vector中提取值和key作为新的dataframe
- 合并两个dataframe,输出到hive
代码细节
val columnsArr = XpredictionTest.columns.slice(0, 24) :+ XpredictionTest.columns.slice(29, 50)
val tmpDf1 = XpredictionTest.select(columnsArr.map(name => col(name)): _*)
val tmpDf2 = XpredictionTest.select("key", "probabilities").rdd.map(t => {
val prob = t(1).asInstanceOf[DenseVector](1)
var newCol = 0
(t(0).toString, prob)
}).toDF("key_b", "prob_1")
val resDF = tmpDf3.join(tmpDf2,tmpDf3("key")===tmpDf2("key_b"),"inner")
resDF.write.mode("append").format("hive").saveAsTable("xxxx")
|