实时ETL开发之流计算程序【编程】
编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
object LogisticsEtlApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.sql.session.timeZone", "Asia/Shanghai")
.set("spark.sql.files.maxPartitionBytes", "134217728")
.set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
sparkConf
.set("spark.master", "local[3]")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
} else {
sparkConf
.set("spark.master", "yarn")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
}
val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val logisticsDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
.option("subscribe", "logistics")
.option("maxOffsetsPerTrigger", "100000")
.load()
logisticsDF.writeStream
.queryName("query-logistics-console")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
val crmDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
.option("subscribe", "crm")
.option("maxOffsetsPerTrigger", "100000")
.load()
crmDF.writeStream
.queryName("query-crm-console")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
spark.streams.active.foreach(query => println("启动Query:" + query.name))
spark.streams.awaitAnyTermination()
}
}
SparkSQL 参数调优设置:
-
1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai") -
2)、设置读取文件时单个分区可容纳的最大字节数 set("spark.sql.files.maxPartitionBytes", "134217728") -
3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728") -
4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4") -
5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小 set("spark.sql.autoBroadcastJoinThreshold", "67108864")
|