IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark与hive,mysql交互 -> 正文阅读

[大数据]spark与hive,mysql交互

spark读取hive表的数据处理后存到mysql

●agg返回DF类型 括号里接收的是列 所以可以在括号中给列起别名
○直接写count返回的是df 无法给列起别名
●join 所要查询的数据放在leftjoin左边
●注意方法的返回值 确定返回类型是df还是其他类型
●当遇到联查列重复时,对应的df(列名)
●join的写法
○df1.join(df2,Seq(列名),"left")
○rdf1.join(df2,df1(列名)===df2(列名),"left")

package expandword

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, date_format, desc, rank, sum}

object four {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("rigion")
      .enableHiveSupport()
      .getOrCreate()

    //使用hive的ods库
    spark.sql("use shenyunhang")

    val ord=spark.table("orders")
    val cus=spark.table("customer")
    val nat=spark.table("nation")
    val reg=spark.table("region ")

    val rs=ord
      .join(cus,Seq("custkey"),joinType = "left")
      .join(nat,cus.col("nationkey")===nat.col("nationkey"))
      .join(reg,nat.col("regionkey")===reg.col("regionkey"))
      .select(
        nat("name").as("nname"),
        reg("name").as("rname"),
        ord("totalprice"),
        (date_format(ord("orderdate"),"yyyyMM")).as("times")
      )
      .groupBy(col("nname"),col("rname"),col("times"))
      .agg(
        sum("totalprice").as("sum")
      )
      .orderBy("sum")
      .select(
        col("nname"),
        col("rname"),
        col("sum"),
        rank() over(Window.orderBy(desc("sum")))
      )
      
      rs.show()

      //落地
    rs.coalesce(1).write
      .format("jdbc")
      .mode(SaveMode.Overwrite)
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
      .option("dbtable","ORDERS")
      .option("user","job023_group4")
      .option("password","job023@TL")
      .save()
  }
}

插入mysql的增量数据到hive、动态分区

需要注意的是在我们从MySQl拿到数据动态分区插入到Hive中时,是需要配置的。
●开启动态分区参数设置(还有其他配置,这里用这两个就可以)
○hive.exec.dynamic.partition=true
■开启动态分区功能(默认 true ,开启)
○hive.exec.dynamic.partition.mode=nonstrict
■设置为非严格模式(动态分区的模式,默认 strict ,表示必须指定至少一个分区为静态分区, nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
●在load完后,使用.where可以对加载出来的数据进行筛选
●在这里因为动态分区有格式要求,所以用hive中自带的date_format()方法,进行格式转换
●增量的概念就是每次插入的时候,插进去的数据hive中原来没有的数据,而不是overrite全部重新加载到里边,在这里用到的是在mysql中查到的数据创建一个视图,然后再hive中sql对这个视图和hive中数据的表进行left join查询,最后只取null的数据

package expandword

import org.apache.spark.sql.{SaveMode, SparkSession}

object MysqlToHive2 {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder()
      .appName("reda mysql to hive limit")
//      .master("local[*]")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition","true")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .getOrCreate()
    
    //加载mysql的数据
    val df = sparkSession.read
      .format("jdbc")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
      .option("dbtable","ORDERS")
      .option("user","job023_group4")
      .option("password","job023@TL")
      .load()
      .where(  "orderdate>='1997-12-1'")
      .createOrReplaceTempView("mysql_orders")

    //使用hive中的数据
    sparkSession.sql("use shenyunhang")

    sparkSession.sql(
      """
        |insert into table orders1 partition(times)
        |select t1.*,date_format(t1.orderdate,'yyyyMMdd') times
        |from
        |   mysql_orders t1
        |left join
        |   orders1 t2
        |on
        |   t1.orderkey = t2.orderkey
        |where t2.orderkey is null
        |""".stripMargin)
  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 12:04:09  更:2022-10-31 12:05:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/29 1:39:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码