问题背景
在通过Spark读取Elasticsearch的索引并映射成结构化的表,写入到Hive的时候,报错存在同名的列名/字段名。查看Elasticsearch对应index发现,index存在字段名相同但其中某些字母大小写不同的字段(例如,同一个index存在gmtCreate和gmtcreate)。
因为Elasticsearch index中字段默认是大小写敏感的,也就是说,可以存在我们刚才描述的那种情况。而Hive中表列名是大小写不敏感的,Spark读Elasticsearch index转成Dataframe之后,会将index中的字段名全都转成小写,那么在写入Hive的时候,就会报错说存在重名的列。
此处也给出,Spark整合Elasticsearch的Maven依赖:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.11</artifactId>
<version>2.4.1</version>
</dependency>
解决方法
那么,如果想要把ES index索引存入Hive,我们需要对重名列进行重命名。
- 当通过Spark将Elasticsearch index转成DataFrame的时候,我们需要保持列名的大小写敏感,而Spark SQL默认情况下对列名是大小写不敏感的,所以我们需要配置参数spark.sql.caseSensitive来开启列名的大小写敏感;
val spark = SparkSession
.builder()
.appName("es index")
.config("es.nodes", "node-1,node-2,node-3,node-4,node-5")
.config("es.port", "9200")
.config("pushdown", "true")
.config("spark.sql.caseSensitive", "true")
.enableHiveSupport()
.getOrCreate()
- 找出重名的列,并进行相应的处理
import org.elasticsearch.spark._
def process(spark: SparkSession): Unit = {
import spark.implicits._
val ds = spark.read.table(s"db.ods_es_index}")
.mapPartitions(iterator => {
iterator.map(m => {
JSON.parseObject(m.getAs("source").toString).toString
})
})
var df = spark.read.json(ds)
val columns = df.columns
val columnsDistinct = columns.map(_.toLowerCase()).distinct
if (columns.length != columnsDistinct) {
val set = new mutable.HashSet[String]()
val duplicatedCols = mutable.ArrayBuffer[String]()
for (col <- columns) {
if (set.contains(col.toLowerCase)) {
duplicatedCols.append(col)
} else {
set.add(col.toLowerCase)
}
}
for (col <- duplicatedCols) {
df = df.withColumnRenamed(col, col + "_1")
}
}
df
.repartition(3)
.write
.mode("overwrite")
.format("parquet")
.saveAsTable(s"db.table")
}
参考
|