IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> structure streaming 使用小结1-效率(数据更新速度以及优化) -> 正文阅读

[大数据]structure streaming 使用小结1-效率(数据更新速度以及优化)

版本spark3.0.0,总结来源官网以及开发过程中的实验所得。

背景,做的一个实时项目,通过测试发现数据更新速度在15s左右,完全不能满足要求,领导要求查找可以优化的地方,对整个链路进行拆分研究,优化代码

链路为kafka->structure streaming->hbase

测试方法:

虚机

--num-executors 10 --executor-cores 3 \
--driver-memory 4g --executor-memory 6g \
--total-executor-cores 30 \

测试方法,数据源为kafka,输出写入hbase。

造数采用kafka批量写入数据,structure streaming进行消费处理,处理逻辑,关联一个维表,然后统计条数。区别于官网的worldcount。

代码结构

def main(args: Array[String]): Unit = {
 //创建spark session
    val spark = SparkSession.builder
      .appName("jcTables_37")
      //      .master("local[4]")
      .config("spark.sql.warehouse.dir",directory1)
      .getOrCreate()

 //读取mysql的维表数据
    val url = MysqlUtil.url
    val properties = MysqlUtil.properties
    val tn1 = "b_mapping_siteclerk"
    spark.read.jdbc(url, tn1, properties)
      .createOrReplaceTempView("b_mapping_siteclerk")

 /**
      *  实时流数据表加载
      */
    //读取流数据
    //读取流数据 t_odscb_sales
    getJcSaleStream(topic, spark, className)

 /**
      *  实时流数据表逻辑计算
      */
    //销售查询
    getJcSaleSql(spark, className)

spark.streams.awaitAnyTermination()

}

 //读取流数据 t_odscb_sales
  private def getJcSaleStream(topic: Array[String], spark: SparkSession, className: String) = {
    import spark.implicits._
    try {
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", PropertyUtil.getProperty("kafka.bootstrap.servers_jc",""))
        .option("subscribe", topic(0))
        .option("startingOffsets", "earliest")
        .load()
        .selectExpr("CAST(value AS STRING)")
        .as[String]
        .filter(line => line.split(",").length > 2)
//        .dropDuplicates()
        .map(line => {
          val mes = line.split(",")
        val provincial_centre_id = mes(7)
        val terminal_id =  mes(8)
        val business_date = mes(3)
        val total_cost = mes(21)
        val ticket_id = mes(2)
        val clerk_id = mes(10)
        val sales_time = mes(4)
        val terminal_no = mes(9)
        (provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
        })
        .toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
        .repartition(100,new Column("terminal_id"))
        .createOrReplaceTempView("t_odscb_sales")
      logRoot.warn("竞彩报表 t_odscb_sales 表加载完毕 " + className)
      //    val a=1/0
    } catch {
      case ex: Exception => {
        logRoot.error("读取 t_odscb_sales 出错" + className)
        ex.printStackTrace()
        logRoot.error(ex)
      }
    }
  }

 //销售查询
  private def getJcSaleSql(spark: SparkSession, className: String) = {
    try {
      println("313 directory1 销售")
      val sql_sale = JcTableSql.getSql_sale
//      println(sql_sale)
      val df1 = spark.sql(sql_sale)
      val query1 = df1
        .repartition(100,new Column("rowkey"))
        .writeStream
        .outputMode("update")
        .foreach(new SportsPeriodSale)
        //            .format("console")
        .option("checkpointLocation", directory1)
        .start()
      GetLogRoor.logRoot.warn("313 directory1 销售 竞彩 sql_sale 加载完毕 " + className)
      //    val a=1/0
    } catch {
      case ex: Exception => {
        GetLogRoor.logRoot.error("竞彩 sql_sale 查询出错 " + className)
        ex.printStackTrace()
        GetLogRoor.logRoot.error(ex)
      }
    }
  }
 


记录6个时间,

1、数据生成时间,程序产生数据时的时间戳

2、kafka数据进入时间,kafka消费打印时间戳

3、spark 生成df时间,当前时间戳作为df一列

4、spark执行sql时间,sql执行加入当前时间戳

5、通过foreach写数据时间,在hbase sink方法中生成时间戳

6、数据进入hbase时间,hbase记录时间戳,基本与5一致,相差2ms左右

其中,6-2是整个数据更新的过程耗时。

测试单条数据,多次测试,使用6的时间减去2的时间在1.8s左右。

并发测试,

500并发,4-7s

1000并发,4-10s

10000并发,6-15s

50000并发,6-17s

100000并发,6-20s。

其中并发较小时,耗时较大的是5-4,并发较高时,4-3的时间也会增加。

测试后的优化策略,优化4,5两步。

优化方法:

1、轻量化df处理,最开始使用的全量的scheme方法,对比原来的关系型数据库进行df的构建。优化方法,根据输出字段,选择部分构建df,例如,原来40个字段,只用了8个,就选取8个进行df构建。

 (provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
        })
        .toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")

2、优化sql,这个只能尽力,大部分难以优化,特别是逻辑复杂的sql,无法优化的sql。此处优化空间不大。

3、重分区,进行2次重分区,一次是在df之后,建表之前。

.toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
        .repartition(100,new Column("terminal_id"))
        .createOrReplaceTempView("t_odscb_sales")

第二次是在流写入之前。重分区后,对于高并发量数据,有3s左右的速度提升

val query2 = df2
        .repartition(100,new Column("rowkey"))
        .writeStream

主要耗时还是在4到5的过程,ForeachWriter的过程中。此处如何优化,并未找出

最后优化结果,需求在10s左右(考虑并发),可以满足,要求秒级的无法满足,走其他途径。

小结:个人测试结论,使用structure streaming,可以满足低延迟(2-3s)需求,对于要求毫秒级的数据,无法实现。

同事说过一句话很好,大数据实时处理,如果少了大数据三个字,就没了精髓。对于大量并发数据,structure streaming的处理速度基本不受影响,不会因为数据量太大,延迟猛增。

因此需求对应的技术选择很重要,毫秒级需求,请走关系库。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-10 11:34:54  更:2021-07-10 11:35:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 18:36:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码