IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark——Spark读Elasticsearch Index索引报错存在重复列名 -> 正文阅读

[大数据]Spark——Spark读Elasticsearch Index索引报错存在重复列名

问题背景

在通过Spark读取Elasticsearch的索引并映射成结构化的表,写入到Hive的时候,报错存在同名的列名/字段名。查看Elasticsearch对应index发现,index存在字段名相同但其中某些字母大小写不同的字段(例如,同一个index存在gmtCreate和gmtcreate)。

因为Elasticsearch index中字段默认是大小写敏感的,也就是说,可以存在我们刚才描述的那种情况。而Hive中表列名是大小写不敏感的,Spark读Elasticsearch index转成Dataframe之后,会将index中的字段名全都转成小写,那么在写入Hive的时候,就会报错说存在重名的列。

此处也给出,Spark整合Elasticsearch的Maven依赖:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark_2.11</artifactId>
    <version>2.4.1</version>
</dependency>

解决方法

那么,如果想要把ES index索引存入Hive,我们需要对重名列进行重命名。

  1. 当通过Spark将Elasticsearch index转成DataFrame的时候,我们需要保持列名的大小写敏感,而Spark SQL默认情况下对列名是大小写不敏感的,所以我们需要配置参数spark.sql.caseSensitive来开启列名的大小写敏感;
    val spark = SparkSession
                   .builder()
                   .appName("es index")
                   .config("es.nodes", "node-1,node-2,node-3,node-4,node-5")
                   .config("es.port", "9200")
                   .config("pushdown", "true")
                   .config("spark.sql.caseSensitive", "true") //列名大小写敏感
                   .enableHiveSupport()
                   .getOrCreate()
    
  2. 找出重名的列,并进行相应的处理
    import org.elasticsearch.spark._ 
    
    def process(spark: SparkSession): Unit = {
        import spark.implicits._
        //要读取的表中的source列(JSON字符串,JSONObject)对应着Elasticsearch index中的一条记录,
        //保留着index的原始列名(有大小写区分的)
        val ds = spark.read.table(s"db.ods_es_index}")
                    .mapPartitions(iterator => {
                        iterator.map(m => {
                            JSON.parseObject(m.getAs("source").toString).toString
                        })
                    })
            var df = spark.read.json(ds)
    		//获取Dataframe中所有列名
            val columns = df.columns
            //将所有列名都转成小写
            val columnsDistinct = columns.map(_.toLowerCase()).distinct
    
            //是否有重复列
            if (columns.length != columnsDistinct) {
                val set = new mutable.HashSet[String]()
                //保存重复列
                val duplicatedCols = mutable.ArrayBuffer[String]()
                for (col <- columns) {
                    if (set.contains(col.toLowerCase)) {
                        duplicatedCols.append(col)
                    } else {
                        set.add(col.toLowerCase)
                    }
                }
                //对重复列进行重命名
                for (col <- duplicatedCols) {
                    df = df.withColumnRenamed(col, col + "_1")
                }
            }
    
            df
                        .repartition(3)
                        .write
                        .mode("overwrite")
                        .format("parquet")
                        .saveAsTable(s"db.table")
    }
    

参考

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:26:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:47:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码