广播变量&分布式缓存
1.广播变量
我们知道Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。 那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName");
提示:
广播变量被保存在每个节点的内存中,因此不宜过大。
2.分布式缓存
Flink实现的分布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点 中,防止task重复拉取。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();
public final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
...
}
@Override
public Integer map(String value) throws Exception {
...
}
}
提示:
用户自定义函数必须继承RichFunction方法,因为需要访问RuntimeContext
|