1、RDD性质
RDD数据是过程数据,即计算得到后一个RDD时,前一个RDD就会被释放。如图就是当RDD4被计算出来RDD3就会被释放。
2、为什么要持久化呢?
如图,一个应用程序中存在两个collect,中间链路都调用了RDD3,可是在第一个collect执行完时,由于RDD时过程数据,RDD3已经被释放,所以执行第二个collect需要重新计算RDD1、RDD2、RDD3,这样就浪费资源。为了避免重新计算RDD3,则对RDD3进行持久化。
3、RDD持久化方法
rdd3.cache() # 等效于下一行代码
rdd3.persist(StorageLevel.MEMORY_ONLY) # 在内存中持久化
rdd3.persist(StorageLevel.DISK_ONLY) # 在硬盘中持久化
4、测试RDD持久化效果?
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
def f(x):
print("rdd1")
return x * 10
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3])
rdd1 = rdd.map(f)
rdd1.cache()
rdd2 = rdd1.map(lambda x: -x)
print(rdd2.collect())
rdd3 = rdd1.map(lambda x: x+100)
print(rdd3.collect())
rdd1.persist()
rdd1
rdd1
rdd1
[-10, -20, -30]
[110, 120, 130]
此代码修改将f方法作为rdd1.map中的方法,通过运行代码可得,在对rdd1进行持久化的情况下,print("rdd1")只在第一次调用collect的时候执行,第二次调用collect时不被执行
|