在spark里,一个action算子会生成一个job,一个重分区算子形成宽依赖会划分出前后两个stage。
spark会先读取代码然后生成执行计划,这个过程中,每个rdd和dataframe的分区数会计算出来,在driver端print的内容也会打印出来(代码日志),等于把整个spark代码加载的一个过程。
然后再执行转换算子里的操作,操作里的打印内容在executor端打印出来,
最后执行action算子的操作,把action算子的结果在driver端(代码日志)打印出来。
def read_hive_table(self, start=-1, length=1):
读取hdfs路径生成rdd转df
print "1. df的分区个数:", task_df.rdd.getNumPartitions()
df.show(1)
return df
def process_detour_reason(self, data_df):
print "2. df的分区个数:", data_df.rdd.getNumPartitions()
定义udf1
data_df_1 = data_df.repartition(8).withColumn(XXX, udf1(XXXX))
print "3. df的分区个数:", data_df_2.rdd.getNumPartitions()
data_df_1.show(1)
def process(self):
table_data_df = self.read_hive_table(-1, 1)
table_result_df_1 = self.process_detour_reason(table_data_df)
self.fs.write_fs_for_df(table_result_df_2, "XXXX")
当执行到函数process_detour_reason时,会发现打印内容如下:
2. df的分区个数: 10
3. df的分区个数: 8
----process_detour_reason函数任务执行中-----
此时正在执行process_detour_reason函数,df的每一条数据在调用udf进行处理,但是发现此时task并未按照预期的开8个分区并行计算,而是只运行了1个task。
原因如下:
spark中一个action算子触发一个job,会将整个job里的转换算子全部执行一遍(数据转换),最后通过action算子进行数据输出到driver端;
也就是说,因为有action算子show(1)的存在,df的repartition和withColumn这两个数据转换算子会得以生效,但是show(1)比较特殊,因为其只需要返回1条数据,所以内部做了优化,只会计算一个分区的数据,然后返回一条到driver端,这也就是为什么在spark的ui中看到这一步骤只有一个task在执行;
因为在这个job里,8个分区,只会执行1个分区的数据,计算完毕后,通过show(1)返回给driver端;
那剩下7个分区的数据什么时候执行呢?在最后saveAs将df的数据存储下来时,由于save也是一个action算子,所以触发save算子时,会执行剩下7个分区的udf计算。
总结:
1. 一般一个spark代码里,除非涉及数据的临时保存,需要用到persist、cache等,只需要在代码结尾处设置一个action算子即可,这样action算子之前的所有转换操作都会得到执行;
2. spark代码的预加载机制,会在生成dag图时,把代码里的print执行出来,所以并不是print1,udf执行,再print2;而是print1,print2,加载完毕dag生成,udf执行;
3. rdd转dataframe后,分区数不变;
4. df.repartition().转换算子,会先重分区,再执行转换算子;
5. repartition()用于增加或减少分区数,coalesc()只用于减少分区数;
6. show(1)一般用于代码调试,实际运算时,建议用count()作为执行算子,这样不会对重分区造成影响;
7. 查询df的分区数:df.rdd.getNumPartitions()
spark代码的执行过程:
先从前往后执行,加载代码,遇到action算子后,生成一个job,然后从后往前划分stage,然后再从前往后执行。
|