IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 物流项目中SparkSQL的相关调优 -> 正文阅读

[大数据]物流项目中SparkSQL的相关调优

实时ETL开发之流计算程序【编程】

编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
	 * 1. 初始化设置Spark Application配置
	 * 2. 判断Spark Application运行模式进行设置
	 * 3. 构建SparkSession实例对象
	 * 4. 初始化消费物流Topic数据参数
	 * 5. 消费物流Topic数据,打印控制台
	 * 6. 初始化消费CRM Topic数据参数
	 * 7. 消费CRM Topic数据,打印控制台
	 * 8. 启动流式应用,等待终止
 */
object LogisticsEtlApp {
	
	def main(args: Array[String]): Unit = {
		// step1. 构建SparkSession实例对象,设置相关属性参数值
		// 1. 初始化设置Spark Application配置
		val sparkConf = new SparkConf()
    		.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
			.set("spark.sql.session.timeZone", "Asia/Shanghai")
			.set("spark.sql.files.maxPartitionBytes", "134217728")
			.set("spark.sql.files.openCostInBytes", "134217728")
			.set("spark.sql.shuffle.partitions", "3")
			.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
		// 2. 判断Spark Application运行模式进行设置
		if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
			//本地环境LOCAL_HADOOP_HOME
			System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
			//设置运行环境和checkpoint路径
			sparkConf
				.set("spark.master", "local[3]")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
		} else {
			//生产环境
			sparkConf
				.set("spark.master", "yarn")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
		}
		// 3. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
    		.config(sparkConf)
			.getOrCreate()
		import spark.implicits._
		
		// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
		// step3. 将ETL转换后数据打印到控制台,启动流式应用
		// 4. 初始化消费物流Topic数据参数
		val logisticsDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "logistics")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 5. 消费物流Topic数据,打印控制台
		logisticsDF.writeStream
			.queryName("query-logistics-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// 6. 初始化消费CRM Topic数据参数
		val crmDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "crm")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 7. 消费CRM Topic数据,打印控制
		crmDF.writeStream
			.queryName("query-crm-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// step4. 流式应用启动以后,等待终止,关闭资源
		// 8. 启动流式应用,等待终止
		spark.streams.active.foreach(query => println("启动Query:" + query.name))
		spark.streams.awaitAnyTermination()
	}
	
}

SparkSQL 参数调优设置:

  • 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")

  • 2)、设置读取文件时单个分区可容纳的最大字节数

    set("spark.sql.files.maxPartitionBytes", "134217728")

  • 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")

  • 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")

  • 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

    set("spark.sql.autoBroadcastJoinThreshold", "67108864")

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:46:25  更:2022-05-09 12:46:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:31:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码