广播流与普通流JOIN图解
user actions 可以看作是事件流
patterns 为广播流,把全量数据加载到不同的计算节点
普通双流join 根据join 条件,根据key的发到同一个计算节点,如下图类似
案例
package com.zxl.broadcasts;
import com.zxl.blink.DataDB;
import com.zxl.blink.Person;
import com.zxl.blink.StudentDB;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class BroadcastDemo {
public static void main(String[] args) throws Exception {
//配置FLINK WEB UI 可以登入localhost:8848 查看flink运行图
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT,8848);
//创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
environment.setParallelism(4);
//调用自定义函数形成streamone
DataStream<Tuple2<Integer, Long>> studentSource = environment.addSource(new StudentDB());
//调用自定义函数形成streamtwo
DataStream<Person> dataStream = environment.addSource(new DataDB());
// TODO: 2022/2/17 首先使用id将流进行进行分区(keyBy),这能确保相同id的数据会流转到相同的物理机上。
KeyedStream<Person, Tuple> pidStream= dataStream.keyBy("pid");
// TODO: 2022/2/17 定义一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 key的类型和value的类型
MapStateDescriptor<Integer, Tuple2<Integer, Long>> student = new MapStateDescriptor<>(
"student",
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.of(new TypeHint<Tuple2<Integer, Long>>() {
})
);
// TODO: 2022/2/17 广播流,广播规则并且创建 broadcast state
BroadcastStream<Tuple2<Integer, Long>> broadcast = studentSource.broadcast(student);
// TODO: 2022/2/17 将两个流关联起来 完成匹配
// // KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// ** KeyedStream泛型里面的第二个值是key,第一个值是元素类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string
DataStream<Tuple5<Integer, Long, Integer, String, String>> streamOperator = pidStream.connect(broadcast)
.process(new KeyedBroadcastProcessFunction<Tuple, Person, Tuple2<Integer, Long>, Tuple5<Integer, Long, Integer, String, String>>() {
//定义map 状态来存储广播流数据
MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor;
//初始化state
//在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mapStateDescriptor = new MapStateDescriptor<>("student", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {}));
}
// processBroadcastElement() 负责处理广播流的元素
@Override
public void processElement(Person person, ReadOnlyContext ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
// 处理每一个元素,看state是否有匹配的,有的话,下发到下一个节点
ReadOnlyBroadcastState<Integer, Tuple2<Integer, Long>> state = ctx.getBroadcastState(mapStateDescriptor);
if ((person.getPid() != null && state.get(person.getPid()) != null)) {
System.out.println("匹配到" + person.toString());
out.collect(new Tuple5<Integer, Long, Integer, String, String>(person.getPid(), state.get(person.getPid()).f1, person.getPage(), person.getPname(), person.getPsex()));
}
}
// processElement() 负责处理另一个流的元素
@Override
public void processBroadcastElement(Tuple2<Integer, Long> input, Context ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
// 新增加的广播元素,放入state中
//System.out.println("新增加需要监控的" + input.toString());
ctx.getBroadcastState(mapStateDescriptor).put(input.f0, input);
}
});
//打印结果
streamOperator.print("broadcast");
//执行程序
environment.execute();
}
}
在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。 也就是描述了用于存储规则的和初始化state用于存储广播变量的名称一定要相同,不然就会报错。
The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(…) call if you forgot to register it.
|