广播变量:
- 如果使用了广播变量技术,则 Driver 端将共享数据只会发送到每个 Executor 一份。Executor 中的所有 Task 都复用这个对象。
- 如果不用广播变量技术,则 Driver 端默认会将共享数据分发到每个 Task 中,造成网络分发压力大。甚至导致你在进行RDD持久化到内存时,因内存不足而被迫存到磁盘,增加了磁盘IO,严重降低性能。
广播变量使用方法(Python实现):
-
要保证该共享对象是可序列化的。因为跨节点传输的数据都要是可序列化的。 -
在Driver端将共享对象广播到每个Executor: #2-定义一个列表,装特殊字符
list_v=[",", ".", "!", "#", "$", "%"]
#3-将列表从Driver端广播到各个Executor中
bc=sc.broadcast(list_v) - 在Executor中获取:
list2=bc.value
累加器:
????????Spark提供的 Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator 只提供了累加的功能,即确提供了多个 task 对一个变量并行操作的功能。但是 task 只能对 Accumulator 进行累加操作,不能读取 Accumulator 的值,只有 Driver 程序可以读取 Accumulator 的值。创建的 Accumulator 变量的值能够在 Spark Web UI 上看到,所以在创建时应尽量为其命名。
????????Spark内置了三种类型的Accumulator,分别是?LongAccumulator 累加整数型,DoubleAccumulator 累加浮点型,CollectionAccumulator 累加集合元素。
整数累加器使用方法(Python实现)?:
- 在Driver端定义整数累加器,赋初始值。
acc=sc.accumulator(0) - 在Task中每次累加1
acc.add(1)
# 或者
acc+=1
PySpark累加器和广播变量的综合案例:
- 案例:过滤非单词字符,并对非单词字符进行统计
- ?Python代码实现:
from pyspark import SparkConf, SparkContext, StorageLevel
import os,jieba,time,re
#指定环境变量
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
if __name__ == '__main__':
conf = SparkConf().setAppName('text1').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2-定义一个列表,装特殊字符
list_v = [",", ".", "!", "#", "$", "%"]
# 3-将列表从Driver端广播到各个Executor中
bc = sc.broadcast(list_v)
# 4-定义累加器,后续在分布式task中,每遇到特殊字符,就累加1
acc = sc.accumulator(0)
# 5-加载文本内容,形成RDD
input_rdd=sc.textFile('file:///export/pyworkspace/sz27_spark/pyspark_core/data/accumulator_broadcast_data.txt')
# 6-过滤空行
filtered_rdd = input_rdd.filter(lambda line:len(line.strip())>0)
# 7-将每行长句子,按空白字符拆分成短字符串
str_rdd = filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
# 8-对上面的RDD,过滤,每遇到特殊字符,累加器acc就累加1,并且剔除掉特殊字符,形成的RDD只包含单词
def filter_str(str):
global acc
#获取特殊字符列表
list2 = bc.value
if str in list2:
acc.add(1)
return False
else:
return True
word_rdd = str_rdd.filter(filter_str)
# 9-对单词进行计数,
wordcount_rdd = word_rdd.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
# 10-打印单词计数结果
print('单词记数结果:',wordcount_rdd.collect())
# 11-打印累加器的值,即特殊字符的个数
print('特殊字符累加器结果:',acc.value)
sc.stop()
|