|
版本spark3.0.0,总结来源官网以及开发过程中的实验所得。
背景,做的一个实时项目,通过测试发现数据更新速度在15s左右,完全不能满足要求,领导要求查找可以优化的地方,对整个链路进行拆分研究,优化代码
链路为kafka->structure streaming->hbase
测试方法:
虚机
--num-executors 10 --executor-cores 3 \
--driver-memory 4g --executor-memory 6g \
--total-executor-cores 30 \
测试方法,数据源为kafka,输出写入hbase。
造数采用kafka批量写入数据,structure streaming进行消费处理,处理逻辑,关联一个维表,然后统计条数。区别于官网的worldcount。
代码结构
def main(args: Array[String]): Unit = {
//创建spark session
val spark = SparkSession.builder
.appName("jcTables_37")
// .master("local[4]")
.config("spark.sql.warehouse.dir",directory1)
.getOrCreate()
//读取mysql的维表数据
val url = MysqlUtil.url
val properties = MysqlUtil.properties
val tn1 = "b_mapping_siteclerk"
spark.read.jdbc(url, tn1, properties)
.createOrReplaceTempView("b_mapping_siteclerk")
/**
* 实时流数据表加载
*/
//读取流数据
//读取流数据 t_odscb_sales
getJcSaleStream(topic, spark, className)
/**
* 实时流数据表逻辑计算
*/
//销售查询
getJcSaleSql(spark, className)
spark.streams.awaitAnyTermination()
}
//读取流数据 t_odscb_sales
private def getJcSaleStream(topic: Array[String], spark: SparkSession, className: String) = {
import spark.implicits._
try {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertyUtil.getProperty("kafka.bootstrap.servers_jc",""))
.option("subscribe", topic(0))
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.filter(line => line.split(",").length > 2)
// .dropDuplicates()
.map(line => {
val mes = line.split(",")
val provincial_centre_id = mes(7)
val terminal_id = mes(8)
val business_date = mes(3)
val total_cost = mes(21)
val ticket_id = mes(2)
val clerk_id = mes(10)
val sales_time = mes(4)
val terminal_no = mes(9)
(provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
})
.toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
.repartition(100,new Column("terminal_id"))
.createOrReplaceTempView("t_odscb_sales")
logRoot.warn("竞彩报表 t_odscb_sales 表加载完毕 " + className)
// val a=1/0
} catch {
case ex: Exception => {
logRoot.error("读取 t_odscb_sales 出错" + className)
ex.printStackTrace()
logRoot.error(ex)
}
}
}
//销售查询
private def getJcSaleSql(spark: SparkSession, className: String) = {
try {
println("313 directory1 销售")
val sql_sale = JcTableSql.getSql_sale
// println(sql_sale)
val df1 = spark.sql(sql_sale)
val query1 = df1
.repartition(100,new Column("rowkey"))
.writeStream
.outputMode("update")
.foreach(new SportsPeriodSale)
// .format("console")
.option("checkpointLocation", directory1)
.start()
GetLogRoor.logRoot.warn("313 directory1 销售 竞彩 sql_sale 加载完毕 " + className)
// val a=1/0
} catch {
case ex: Exception => {
GetLogRoor.logRoot.error("竞彩 sql_sale 查询出错 " + className)
ex.printStackTrace()
GetLogRoor.logRoot.error(ex)
}
}
}
记录6个时间,
1、数据生成时间,程序产生数据时的时间戳
2、kafka数据进入时间,kafka消费打印时间戳
3、spark 生成df时间,当前时间戳作为df一列
4、spark执行sql时间,sql执行加入当前时间戳
5、通过foreach写数据时间,在hbase sink方法中生成时间戳
6、数据进入hbase时间,hbase记录时间戳,基本与5一致,相差2ms左右
其中,6-2是整个数据更新的过程耗时。
测试单条数据,多次测试,使用6的时间减去2的时间在1.8s左右。
并发测试,
500并发,4-7s
1000并发,4-10s
10000并发,6-15s
50000并发,6-17s
100000并发,6-20s。
其中并发较小时,耗时较大的是5-4,并发较高时,4-3的时间也会增加。
测试后的优化策略,优化4,5两步。
优化方法:
1、轻量化df处理,最开始使用的全量的scheme方法,对比原来的关系型数据库进行df的构建。优化方法,根据输出字段,选择部分构建df,例如,原来40个字段,只用了8个,就选取8个进行df构建。
(provincial_centre_id,terminal_id,business_date,total_cost,ticket_id,clerk_id,sales_time,terminal_no)
})
.toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
2、优化sql,这个只能尽力,大部分难以优化,特别是逻辑复杂的sql,无法优化的sql。此处优化空间不大。
3、重分区,进行2次重分区,一次是在df之后,建表之前。
.toDF("provincial_centre_id","terminal_id","business_date","total_cost","ticket_id","clerk_id","sales_time","terminal_no")
.repartition(100,new Column("terminal_id"))
.createOrReplaceTempView("t_odscb_sales")
第二次是在流写入之前。重分区后,对于高并发量数据,有3s左右的速度提升
val query2 = df2
.repartition(100,new Column("rowkey"))
.writeStream
主要耗时还是在4到5的过程,ForeachWriter的过程中。此处如何优化,并未找出
最后优化结果,需求在10s左右(考虑并发),可以满足,要求秒级的无法满足,走其他途径。
小结:个人测试结论,使用structure streaming,可以满足低延迟(2-3s)需求,对于要求毫秒级的数据,无法实现。
同事说过一句话很好,大数据实时处理,如果少了大数据三个字,就没了精髓。对于大量并发数据,structure streaming的处理速度基本不受影响,不会因为数据量太大,延迟猛增。
因此需求对应的技术选择很重要,毫秒级需求,请走关系库。
|