IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> structure streaming 使用小结3-输出模式(appendupdatecomplete) -> 正文阅读

[大数据]structure streaming 使用小结3-输出模式(appendupdatecomplete)

常规使用不提,主要是使用不同模式完成排序,数据更新操作,算是对小结2的补充优化。

1、排序,使用complete模式,将数据流看做静态表,不断追加数据,通过order by可以实现排序功能。

val df =spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "*:9092")
//      .option("kafka.bootstrap.servers", "*:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      .filter(line => line.split(",").length > 2)
//      //        .dropDuplicates()
      .map(line => {
      val mes = line.split(",")
      val provincial_centre_id = mes(0).toInt
      val terminal_id =  mes(1).toLong
      val timestamp = mes(2).toLong
      val value = mes(3).toInt
      val time = MyUtils.getSqlTimes(timestamp)

      (provincial_centre_id,terminal_id,timestamp,value,time)
    })
      .toDF("provincial_centre_id","terminal_id","timestamp","value","time")
      .selectExpr("provincial_centre_id","terminal_id" ,"timestamp","value","time")
      .withWatermark("time","10 seconds")
//      .groupBy("time","provincial_centre_id","terminal_id","timestamp","value").count()
//      .repartition(100,new Column("terminal_id"))
      .createOrReplaceTempView("t_odscb_sales")
//    spark.sql("select * from b_mapping_siteclerk limit 3").show()
    println("=====================>2")

//    val sql_sale = "select terminal_id,max(provincial_centre_id) as provincial_centre_id," +
//      " max(timestamp) as timestamp,1 as value,max(time) as time ,count(1)" +
//      "  from t_odscb_sales  " +
//      " group by terminal_id " +
//      "   "
    val sql_sale = "select terminal_id,provincial_centre_id," +
      " timestamp,time ,count(1)" +
      "  from t_odscb_sales  " +
      " group by terminal_id,provincial_centre_id,timestamp,time" +
  " order by timestamp " +
      "   "
//    //      println(sql_sale)
    val df1 = spark.sql(sql_sale)
//    val query1 = df
      .writeStream
      .outputMode("complete")
//      .foreach(new Test)

      .format("console")
      .option("checkpointLocation", directory1)
      .start()
    println("=====================>3")
    spark.streams.awaitAnyTermination()

排序结果,好用

?

官网说配合withWatermark实现数据过期处理,通过实验,结果失败,数据不能过期,随着数据量增大,表的数据会很大。如何配合使用,有待后续继续研究。(?.withWatermark("time","10 seconds")无效)

?

2、update模式配合max聚合,不支持排序order by。进行计数统计,使用?.withWatermark("time","10 seconds")无效。

?

?去重可用,.dropDuplicates("timestamp")

?

去重加时间范围。去重可用,?withWatermark("time","10 seconds")无效。

?

去重加sql语句的时间控制进行数据处理

.dropDuplicates("timestamp")
" where   timestamp >= unix_timestamp()-60 " +

3、append

.withWatermark("time","10 seconds")延迟一个批次显示

使用聚合函数需要配合

.withWatermark("time","10 seconds")使用

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 14:48:52  更:2021-07-29 14:49:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/22 23:36:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码