IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 的 StructedStreaming -> 正文阅读

[大数据]Spark 的 StructedStreaming

Spark 的 StructedStreaming

一、Spark Streaming的不足

1、基于微批,延迟高,不能做真正的实时

2、DStream基于RDD,不直接支持SQL

3、流批处理的API应用层不统一(流用的DStream–底层是RDD,批的用DF/DS/RDD)

4、不支持EventTime事件时间

  • EventTime事件时间:事件真正发生的时间
  • PorcessingTime处理时间:事件被流系统处理的时间
  • IngestionTime摄入时间:事件到达流系统的时间
  • 如:一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网络延迟,到10月2日00:00:10到达日志处理系统(摄入时间),10月2日00:00:20被流系统处理(处理时间)
  • 如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间

5、数据的Exactly-Once(恰好一次语义)需要手动实现

  • 注:数据的一次性语义
  • 最多一次
  • 恰好一次-----是我们的目标,Spark Streaming如果要实现恰好一次,需要手动维护偏移量
  • 最少一次
二、StructedStreaming
1、StructedStreaming位置

在这里插入图片描述

2、StructedStreaming介绍

编程模型:无界表/动态表格

数据抽象:DataFrame/DataSet

在这里插入图片描述

3、Source-----数据源代码示例

(1)StructedStreaming -----Socket

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Source_Socket {

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host","192.168.89.15")
      .option("port","9999")
      .load()

    //TODO 2.处理数据
    val ds: Dataset[String] = df.as[String]
    val result: Dataset[Row] = ds.flatMap(_.split("\\s+"))
      .groupBy('value)
      .count()
      .orderBy('count.desc)

    //TODO 3.输出结果
    result.writeStream.format("console")
      .outputMode("complete")
    //TODO 4.启动并等待结果
      .start()
      .awaitTermination()
//    ssc.start()
//    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    spark.stop()

  }

}

(2)StructedStreaming -----Rate

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Source_Rate {

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond","10") // 每秒生成数据条数
      .option("rampUpTime","0s")    // 每条数据生成时间间隔
      .option("numPartitions","2")  // 分区数目
      .load()

    //TODO 2.处理数据


    //TODO 3.输出结果
    df.writeStream.format("console")
      .outputMode("append")
      .option("truncate",false)
    //TODO 4.启动并等待结果
      .start()
      .awaitTermination()  // 对列不进行截断,就是展示全列内容
//    ssc.start()
//    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    spark.stop()

  }

}

(3)StructedStreaming -----File

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

object Source_File {

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")


    val csvSchema:StructType = new StructType()
      .add("id",IntegerType,nullable = true)
      .add("name",StringType,nullable = true)
      .add("age",IntegerType,nullable = true)

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
//      .format("rate")
      .option("sep",",") // 每秒生成数据条数
      .option("header","false")    // 每条数据生成时间间隔
      .schema(csvSchema)  // 分区数目
      .format("csv").load("data/input/person.csv")

    //TODO 2.处理数据


    //TODO 3.输出结果
    df.writeStream.format("console")
      .outputMode("append")
      .option("truncate",false)
    //TODO 4.启动并等待结果
      .start()
      .awaitTermination()  // 对列不进行截断,就是展示全列内容
//    ssc.start()
//    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    spark.stop()

  }

}

(4)StructedStreaming -----Operation

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Operation_Socket {

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host","192.168.89.15")
      .option("port","9999")
      .load()

    //TODO 2.处理数据
    // ==========DSL========
    val ds: Dataset[String] = df.as[String]
    val wordsDS: Dataset[String] = ds.flatMap(_.split("\\s+"))

    val result: Dataset[Row] = wordsDS
      .groupBy('value)
      .count()
      .orderBy('count.desc)

    // ==========SQL========
    wordsDS.createOrReplaceTempView("t_words")
    val sql:String =
      """
        |select value,count(*) as counts
        |from t_words
        |group by value
        |order by counts desc
      """.stripMargin
    val result2: DataFrame = spark.sql(sql)

    //TODO 3.输出结果
    result.writeStream.format("console")
      .outputMode("complete")
    //TODO 4.启动并等待结果
      .start()
//      .awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来


    result2.writeStream.format("console")
      .outputMode("complete")
      //TODO 4.启动并等待结果
      .start()
      .awaitTermination()

    //TODO 5.关闭资源
    spark.stop()

  }

}
4、Sink

(1)StructedStreaming -----Output Modes

在这里插入图片描述

Sink-------ForeachBatch

scala代码:

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql._

object Sink_ForeachBatch {

  val url = "jdbc:mysql://localhost:3306/bigdata_test"
  var username = "root"
  val passwd = "123456"

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host","192.168.89.15")
      .option("port","9999")
      .load()

    //TODO 2.处理数据
    // ==========DSL========
    val ds: Dataset[String] = df.as[String]
    val wordsDS: Dataset[String] = ds.flatMap(_.split("\\s+"))

    val result: Dataset[Row] = wordsDS
      .groupBy('value)
      .count()
      .orderBy('count.desc)



    //TODO 3.输出结果
    result.writeStream
        .foreachBatch((ds:Dataset[Row],batchId:Long)=>{
          // 自定义输出控制台
          println("==========自定义输出控制台==========")
          println("batchId------>" + batchId)
          println("==========自定义输出控制台==========")
          ds.show()

          // 自定义输出到Mysql
          ds.coalesce(1)
            .write
            .mode(SaveMode.Overwrite)
            .format("jdbc")
            .option("url",url)
            .option("user",username)
            .option("password",passwd)
            .option("dbtable","bigdata_test.struct_words") // option("dbtable","数据库.表") 没有表会自动创建
            .save()
        })
        .outputMode("complete")
        .start()
        .awaitTermination()


    //TODO 5.关闭资源
    spark.stop()

  }

}

触发间隔:(Tirgger、Checkpoint)

package com.jiang.StructedStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Sink_Tirgger_Checkpoint {

  def main(args: Array[String]): Unit = {

    //TODO 0.创建环境
    val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
      .config("spark.sql.shuffle.partitions","3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //TODO 1.加载数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host","192.168.89.15")
      .option("port","9999")
      .load()

    //TODO 2.处理数据
    val ds: Dataset[String] = df.as[String]
    val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split("\\s+"))
      .groupBy('value)
      .count()
      //.orderBy('count.desc)

    //TODO 3.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      //触发间隔
      //1.默认就是不写
      //2.指定时间间隔
      //.trigger(Trigger.ProcessingTime("5 seconds"))
      //.trigger(Trigger.Once())  // 触发一次
      .trigger(Trigger.Continuous("1 seconds")) // 连续处理并指定Checkpoint时间间隔,实验阶段,实时的
      .option("checkpointLocation","./ckp"+System.currentTimeMillis())
    //TODO 4.启动并等待结果
      .start()
      .awaitTermination()
//    ssc.start()
//    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    spark.stop()

  }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-10 14:37:33  更:2021-07-10 14:38:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 20:40:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码