| 版本spark3.0.0,总结来源官网以及开发过程中的实验所得。 背景,做的一个实时项目,通过测试发现数据更新速度在15s左右,完全不能满足要求,领导要求查找可以优化的地方,对整个链路进行拆分研究,优化代码 链路为kafka->structure streaming->hbase 测试方法: 虚机 --num-executors 10 --executor-cores 3 \
--driver-memory 4g --executor-memory 6g \
--total-executor-cores 30 \
 测试方法,数据源为kafka,输出写入hbase。 造数采用kafka批量写入数据,structure streaming进行消费处理,处理逻辑,关联一个维表,然后统计条数。区别于官网的worldcount。 代码结构 def main(args: Array[String]): Unit = {
 //创建spark session
    val spark = SparkSession.builder
      .appName("jcTables_37")
      //      .master("local[4]")
      .config("spark.sql.warehouse.dir",directory1)
      .getOrCreate()
 //读取mysql的维表数据
    val url = MysqlUtil.url
    val properties = MysqlUtil.properties
    val tn1 = "b_mapping_siteclerk"
    spark.read.jdbc(url, tn1, properties)
      .createOrReplaceTempView("b_mapping_siteclerk")
 /**
      *  实时流数据表加载
      */
    //读取流数据
    //读取流数据 t_odscb_sales
    getJcSaleStream(topic, spark, className)
 /**
      *  实时流数据表逻辑计算
      */
    //销售查询
    getJcSaleSql(spark, className)
spark.streams.awaitAnyTermination()
}
 //读取流数据 t_odscb_sales
  private def getJcSaleStream(topic: Array[String], spark: SparkSession, className: String) = {
    import spark.implicits._
    try {
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", PropertyUtil.getProperty("kafka.bootstrap.servers_jc",""))
        .option("subscribe", topic(0))
        .option("startingOffsets", "earliest")
        .load()
        .selectExpr("CAST(value AS STRING)")
        .as[String]
        .filter(line => line.split(",").length > 2)
//        .dropDuplicates()
        .map(line => {
          val mes = line.split(",")
        val provincial_centre_id = mes(7)
        val terminal_id =  mes(8)
        val business_date = mes(3)
        val total_cost = mes(21)
        val ticket_id = mes(2)
        val clerk_id = mes(10)
        val sales_time = mes(4)
        val terminal_no = mes(9)
        (provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
        })
        .toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
        .repartition(100,new Column("terminal_id"))
        .createOrReplaceTempView("t_odscb_sales")
      logRoot.warn("竞彩报表 t_odscb_sales 表加载完毕 " + className)
      //    val a=1/0
    } catch {
      case ex: Exception => {
        logRoot.error("读取 t_odscb_sales 出错" + className)
        ex.printStackTrace()
        logRoot.error(ex)
      }
    }
  }
 //销售查询
  private def getJcSaleSql(spark: SparkSession, className: String) = {
    try {
      println("313 directory1 销售")
      val sql_sale = JcTableSql.getSql_sale
//      println(sql_sale)
      val df1 = spark.sql(sql_sale)
      val query1 = df1
        .repartition(100,new Column("rowkey"))
        .writeStream
        .outputMode("update")
        .foreach(new SportsPeriodSale)
        //            .format("console")
        .option("checkpointLocation", directory1)
        .start()
      GetLogRoor.logRoot.warn("313 directory1 销售 竞彩 sql_sale 加载完毕 " + className)
      //    val a=1/0
    } catch {
      case ex: Exception => {
        GetLogRoor.logRoot.error("竞彩 sql_sale 查询出错 " + className)
        ex.printStackTrace()
        GetLogRoor.logRoot.error(ex)
      }
    }
  }
 
 记录6个时间, 1、数据生成时间,程序产生数据时的时间戳 2、kafka数据进入时间,kafka消费打印时间戳 3、spark 生成df时间,当前时间戳作为df一列 4、spark执行sql时间,sql执行加入当前时间戳 5、通过foreach写数据时间,在hbase sink方法中生成时间戳 6、数据进入hbase时间,hbase记录时间戳,基本与5一致,相差2ms左右 其中,6-2是整个数据更新的过程耗时。 测试单条数据,多次测试,使用6的时间减去2的时间在1.8s左右。 并发测试, 500并发,4-7s 1000并发,4-10s 10000并发,6-15s 50000并发,6-17s 100000并发,6-20s。 其中并发较小时,耗时较大的是5-4,并发较高时,4-3的时间也会增加。 测试后的优化策略,优化4,5两步。 优化方法: 1、轻量化df处理,最开始使用的全量的scheme方法,对比原来的关系型数据库进行df的构建。优化方法,根据输出字段,选择部分构建df,例如,原来40个字段,只用了8个,就选取8个进行df构建。  (provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
        })
        .toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
 2、优化sql,这个只能尽力,大部分难以优化,特别是逻辑复杂的sql,无法优化的sql。此处优化空间不大。 3、重分区,进行2次重分区,一次是在df之后,建表之前。 .toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
        .repartition(100,new Column("terminal_id"))
        .createOrReplaceTempView("t_odscb_sales")
 第二次是在流写入之前。重分区后,对于高并发量数据,有3s左右的速度提升 val query2 = df2
        .repartition(100,new Column("rowkey"))
        .writeStream
 主要耗时还是在4到5的过程,ForeachWriter的过程中。此处如何优化,并未找出 最后优化结果,需求在10s左右(考虑并发),可以满足,要求秒级的无法满足,走其他途径。 小结:个人测试结论,使用structure streaming,可以满足低延迟(2-3s)需求,对于要求毫秒级的数据,无法实现。 同事说过一句话很好,大数据实时处理,如果少了大数据三个字,就没了精髓。对于大量并发数据,structure streaming的处理速度基本不受影响,不会因为数据量太大,延迟猛增。 因此需求对应的技术选择很重要,毫秒级需求,请走关系库。 |