IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark dataframe写入hive、hdfs、es、oracle -> 正文阅读

[大数据]spark dataframe写入hive、hdfs、es、oracle

1.写入hive的指定分区中写入数据

ResulToOra2.createTempView("tmp")
    spark.sql(
      s"""
         |insert overwrite table pdr.T_PSSC_TRAN_PORTRAIT_CENTER partition(dt='${sdfDay.format(date)}') select * from  tmp
         |
                   """.stripMargin)

    println("write hive end")

2.写入HDFS

          ResulToES
          .coalesce(1)
          .write.format("csv")
          .mode("overwrite")
          .option("header", "true")
          .option("encoding", "utf-8")
          .save(s"hdfs://master01:8020/tmp/tran_es_${sdfMonth.format(date)}")
          
          println("write hdfs end")

2.1 这里coalesce(1)的作用是:一次生成一个文件,如果数据量太大,建议多建各个分区

3.写入ES

 val cfg = Map(
      "es.nodes" -> FileProperties.ES_NODES, // 应用所在的服务器地址
      "es.port" -> FileProperties.ES_PORT,  //  端口号
      //      "es.nodes.wan.only" -> "true",
      "es.resource" -> s"pssc_tran_portrait-${sdfMonth.format(date)}",//模板
      "es.mapping.id" -> "TRAN_ID",//主键
      "es.write.operation" -> "upsert",
      "es.net.http.auth.user" -> "账号",
      "es.net.http.auth.pass" -> "密码"
    )
    //写入ES
    EsSparkSQL.saveToEs(ResulToES, cfg)
    println("write es end")

3.1DataFrame写入ES只需要写一个map

3.2导入import org.elasticsearch.spark.sql.EsSparkSQL

3.3EsSparkSQL.saveToEs(ResulToES, cfg)

3.4前3步都是代码中的,ES中也需要写一个模板,下面是我工作中写过的一个模板,可以参考一下

PUT /_template/pssc_tran_portrait_template
{
    "index_patterns": "pssc_tran_portrait-*", 
    "settings": {
        "number_of_shards": 3
    }, 
    "mappings": {
        "_source": {
            "enabled": true
        }, 
        "properties": {
            "10102": {
                "type": "keyword", 
                "store": true
            }, 
            "10107": {
                "type": "keyword", 
                "store": true
            }, 
            "10108": {
                "type": "keyword", 
                "store": true
            }, 
            "10109": {
                "type": "keyword", 
                "store": true
            }, 
            "10110": {
                "type": "keyword", 
                "store": true
            }, 
            "10111": {
                "type": "keyword", 
                "store": true
            }, 
            "10112": {
                "type": "keyword", 
                "store": true
            }, 
            "10113": {
                "type": "keyword", 
                "store": true
            }, 
            "10114": {
                "type": "keyword", 
                "store": true
            }, 
            "10115": {
                "type": "keyword", 
                "store": true
            }, 
            "10116": {
                "type": "keyword", 
                "store": true
            }, 
            "10117": {
                "type": "keyword", 
                "store": true
            }, 
            "10118": {
                "type": "keyword", 
                "store": true
            }, 
            "10119": {
                "type": "keyword", 
                "store": true
            }, 
            "10120": {
                "type": "keyword", 
                "store": true
            }, 
            "10121": {
                "type": "keyword", 
                "store": true
            }, 
            "10210": {
                "type": "keyword", 
                "store": true
            }, 
            "10213": {
                "type": "keyword", 
                "store": true
            }, 
            "10214": {
                "type": "keyword", 
                "store": true
            }, 
            "10215": {
                "type": "keyword", 
                "store": true
            }, 
            "10216": {
                "type": "keyword", 
                "store": true
            }, 
            "10217": {
                "type": "keyword", 
                "store": true
            }, 
            "10220": {
                "type": "keyword", 
                "store": true
            }, 
            "10221": {
                "type": "keyword", 
                "store": true
            }, 
            "30102": {
                "type": "keyword", 
                "store": true
            }, 
            "30107": {
                "type": "keyword", 
                "store": true
            }, 
            "tran_id": {
                "type": "keyword", 
                "store": true, 
                "ignore_above": 80
            }, 
            "tran_name": {
                "type": "keyword", 
                "store": true
            }, 
            "currYearPQ": {
                "type": "nested", 
                "properties": {
                    "year_month": {
                        "type": "integer", 
                        "store": true
                    }, 
                    "P_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }, 
                    "Q_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }
                }
            }, 
            "lastYearPQ": {
                "type": "nested", 
                "properties": {
                    "last_year_month": {
                        "type": "integer", 
                        "store": true
                    }, 
                    "LAST_P_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }, 
                    "LAST_Q_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }
                }
            }
        }
    }, 
    "aliases": {
        "search-pssc_tran_portrait": { }
    }
}

4.写入oracle

4.1公司中一般对删除Oracle中数据进行封装

def delAnyTable(tabName: String, condition: String): Unit = {
    var conn: Connection = null
    var delPS: PreparedStatement = null
    val delSql = s"delete  $tabName where $condition "
    println("delSql:"+delSql)
    try {
      Class.forName(FileProperties.oracleDriver) 
      conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
      delPS = conn.prepareStatement(delSql)
      delPS.execute()
      delPS.close()
//      conn.commit()
      println(s"====================删除 $tabName 完毕!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (delPS != null) {
        delPS.close()
     //   conn.commit()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

4.2 这里删除Oracle数据时,因为Oracle的url和配置被写死,只需要传入了表名和过滤条件,就可以对表进去删除操作,但无法对不同的Oracle数据库进行删除,你也可以增加参数url和Properties,来达到对不同数据库进行删除操作

def delAnyTable(tabName: String, condition: String,url :String,connp :Properties): Unit = {
    var conn: Connection = null
    var delPS: PreparedStatement = null
    val delSql = s"delete  $tabName where $condition "
    println("delSql:"+delSql)
    try {
      Class.forName(FileProperties.oracleDriver)
      conn = DriverManager.getConnection(url, connp)
      delPS = conn.prepareStatement(delSql)
      delPS.execute()
      delPS.close()
      //      conn.commit()
      println(s"====================删除 $tabName 完毕!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (delPS != null) {
        delPS.close()
        //   conn.commit()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

4.3同样对插入Oracle中的数据进行封装


//private def getType(df: DataFrame) = df.schema.fields.map(_.dataType.typeName).map(r => if (r.startsWith("decimal")) "decimal" else r)

//public static final Integer BATCH_COMMIT_CNT = 10000;

//public static final Integer OVER_COMMIT_CNT = 1000000;

def toOracleInsert(df: DataFrame, tableName: String) {

    val schema: Array[String] = getType(df)
    val fields = df.schema.fieldNames.mkString(",")
    val fields1 = schema.map(r => "?").mkString(",")
//      schema.foreach(println)
    val sql = s"""insert into $tableName ($fields) values($fields1)"""

    df.repartition(16).foreachPartition(row => {
      var conn: Connection = null
      var ps: PreparedStatement = null

      try {
        Class.forName(FileProperties.oracleDriver)
        conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
        ps = conn.prepareStatement(sql)
        conn.setAutoCommit(false)
        var iCnt: Int = 0
        row.foreach(data => {
          iCnt += 1
          ExecPS(data, ps, schema)
          ps.addBatch()

          if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
            println(s"============================$iCnt")
            ps.executeBatch()
            ps.clearBatch()
          }
          if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
            conn.commit()
          }
        })

        println(s"============================$iCnt")
        ps.executeBatch()
        ps.clearBatch()
        conn.commit()

      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    })
  }

4.4 同样也可以增加参数,达到对不同Oracle数据库中的数据进行删除操作

//private def getType(df: DataFrame) = df.schema.fields.map(_.dataType.typeName).map(r => if (r.startsWith("decimal")) "decimal" else r)

//public static final Integer BATCH_COMMIT_CNT = 10000;

//public static final Integer OVER_COMMIT_CNT = 1000000;


def toOracleInsert(df: DataFrame, tableName: String,url :String,connp :Properties) {

    val schema = getType(df)
    val fields = df.schema.fieldNames.mkString(",")
    val fields1 = schema.map(r => "?").mkString(",")
//      schema.foreach(println)
    val sql = s"""insert into $tableName ($fields) values($fields1)"""

    df.repartition(16).foreachPartition(row => {
      var conn: Connection = null
      var ps: PreparedStatement = null

      try {
        Class.forName(FileProperties.oracleDriver)
        conn = DriverManager.getConnection(url, connp)
        ps = conn.prepareStatement(sql)
        conn.setAutoCommit(false)
        var iCnt: Int = 0
        row.foreach(data => {
          iCnt += 1
          ExecPS(data, ps, schema)
          ps.addBatch()

          if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
            println(s"============================$iCnt")
            ps.executeBatch()
            ps.clearBatch()
          }
          if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
            conn.commit()
          }
        })

        println(s"============================$iCnt")
        ps.executeBatch()
        ps.clearBatch()
        conn.commit()

      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    })
  }

4.5 以上的方法一般同时调用

ResaveToOracle.delAnyTable("T_PSSC_TRAN_PORTRAIT_CENTER", s"STAT_DATE=${sdfMonth.format(date)}")
ResaveToOracle.toOracleInsert(ResulToOra2, "T_PSSC_TRAN_PORTRAIT_CENTER")

5.总结

以上都是企业实战,支持回写。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-01 12:00:29  更:2021-09-01 12:01:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:14:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码