IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> pySpark,执行算子show(1)导致并行度不对的问题 -> 正文阅读

[大数据]pySpark,执行算子show(1)导致并行度不对的问题

在spark里,一个action算子会生成一个job,一个重分区算子形成宽依赖会划分出前后两个stage。

spark会先读取代码然后生成执行计划,这个过程中,每个rdd和dataframe的分区数会计算出来,在driver端print的内容也会打印出来(代码日志),等于把整个spark代码加载的一个过程。

然后再执行转换算子里的操作,操作里的打印内容在executor端打印出来,

最后执行action算子的操作,把action算子的结果在driver端(代码日志)打印出来。

def read_hive_table(self, start=-1, length=1):
    读取hdfs路径生成rdd转df
    print "1. df的分区个数:", task_df.rdd.getNumPartitions()
    df.show(1)
    return df

def process_detour_reason(self, data_df):
    print "2. df的分区个数:", data_df.rdd.getNumPartitions()
    定义udf1
    data_df_1 = data_df.repartition(8).withColumn(XXX, udf1(XXXX))
    print "3. df的分区个数:", data_df_2.rdd.getNumPartitions()
    data_df_1.show(1)

def process(self):
    table_data_df = self.read_hive_table(-1, 1)
    table_result_df_1 = self.process_detour_reason(table_data_df)
    self.fs.write_fs_for_df(table_result_df_2, "XXXX")

当执行到函数process_detour_reason时,会发现打印内容如下:

2. df的分区个数: 10
3. df的分区个数: 8
----process_detour_reason函数任务执行中-----

此时正在执行process_detour_reason函数,df的每一条数据在调用udf进行处理,但是发现此时task并未按照预期的开8个分区并行计算,而是只运行了1个task。

原因如下:

spark中一个action算子触发一个job,会将整个job里的转换算子全部执行一遍(数据转换),最后通过action算子进行数据输出到driver端;

也就是说,因为有action算子show(1)的存在,df的repartition和withColumn这两个数据转换算子会得以生效,但是show(1)比较特殊,因为其只需要返回1条数据,所以内部做了优化,只会计算一个分区的数据,然后返回一条到driver端,这也就是为什么在spark的ui中看到这一步骤只有一个task在执行;

因为在这个job里,8个分区,只会执行1个分区的数据,计算完毕后,通过show(1)返回给driver端;

那剩下7个分区的数据什么时候执行呢?在最后saveAs将df的数据存储下来时,由于save也是一个action算子,所以触发save算子时,会执行剩下7个分区的udf计算。

总结:

1. 一般一个spark代码里,除非涉及数据的临时保存,需要用到persist、cache等,只需要在代码结尾处设置一个action算子即可,这样action算子之前的所有转换操作都会得到执行;

2. spark代码的预加载机制,会在生成dag图时,把代码里的print执行出来,所以并不是print1,udf执行,再print2;而是print1,print2,加载完毕dag生成,udf执行;

3. rdd转dataframe后,分区数不变;

4. df.repartition().转换算子,会先重分区,再执行转换算子;

5. repartition()用于增加或减少分区数,coalesc()只用于减少分区数;

6. show(1)一般用于代码调试,实际运算时,建议用count()作为执行算子,这样不会对重分区造成影响;

7. 查询df的分区数:df.rdd.getNumPartitions()

spark代码的执行过程:

先从前往后执行,加载代码,遇到action算子后,生成一个job,然后从后往前划分stage,然后再从前往后执行。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 12:04:09  更:2022-10-31 12:05:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/28 18:49:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码