| 1、RDD性质 RDD数据是过程数据,即计算得到后一个RDD时,前一个RDD就会被释放。如图就是当RDD4被计算出来RDD3就会被释放。 2、为什么要持久化呢? 如图,一个应用程序中存在两个collect,中间链路都调用了RDD3,可是在第一个collect执行完时,由于RDD时过程数据,RDD3已经被释放,所以执行第二个collect需要重新计算RDD1、RDD2、RDD3,这样就浪费资源。为了避免重新计算RDD3,则对RDD3进行持久化。 3、RDD持久化方法 rdd3.cache()                            # 等效于下一行代码
rdd3.persist(StorageLevel.MEMORY_ONLY)    # 在内存中持久化
rdd3.persist(StorageLevel.DISK_ONLY)    # 在硬盘中持久化
 4、测试RDD持久化效果? from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    def f(x):
        print("rdd1")
        return x * 10
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 2, 3])
    rdd1 = rdd.map(f)
    rdd1.cache()
    rdd2 = rdd1.map(lambda x: -x)
    print(rdd2.collect())
    rdd3 = rdd1.map(lambda x: x+100)
    print(rdd3.collect())
    rdd1.persist()
 rdd1
rdd1
rdd1
[-10, -20, -30]
[110, 120, 130]
 此代码修改将f方法作为rdd1.map中的方法,通过运行代码可得,在对rdd1进行持久化的情况下,print("rdd1")只在第一次调用collect的时候执行,第二次调用collect时不被执行 |