IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 广播流Broadcast State 模式 -> 正文阅读

[大数据]Flink 广播流Broadcast State 模式

广播流与普通流JOIN图解

user actions 可以看作是事件流

patterns 为广播流,把全量数据加载到不同的计算节点
在这里插入图片描述

普通双流join
根据join 条件,根据key的发到同一个计算节点,如下图类似

在这里插入图片描述

案例

package com.zxl.broadcasts;

import com.zxl.blink.DataDB;
import com.zxl.blink.Person;
import com.zxl.blink.StudentDB;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;


public class BroadcastDemo {
    public static void main(String[] args) throws Exception {
        //配置FLINK WEB UI 可以登入localhost:8848 查看flink运行图
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT,8848);
        //创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        environment.setParallelism(4);
        //调用自定义函数形成streamone
        DataStream<Tuple2<Integer, Long>> studentSource = environment.addSource(new StudentDB());
        //调用自定义函数形成streamtwo
        DataStream<Person> dataStream = environment.addSource(new DataDB());
        // TODO: 2022/2/17 首先使用id将流进行进行分区(keyBy),这能确保相同id的数据会流转到相同的物理机上。
        KeyedStream<Person, Tuple> pidStream= dataStream.keyBy("pid");
        // TODO: 2022/2/17  定义一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 key的类型和value的类型
        MapStateDescriptor<Integer, Tuple2<Integer, Long>> student = new MapStateDescriptor<>(
                "student",
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.of(new TypeHint<Tuple2<Integer, Long>>() {
                })
        );
        // TODO: 2022/2/17   广播流,广播规则并且创建 broadcast state
        BroadcastStream<Tuple2<Integer, Long>> broadcast = studentSource.broadcast(student);
        // TODO: 2022/2/17 将两个流关联起来 完成匹配
        // // KeyedBroadcastProcessFunction 中的类型参数表示:
        //   1. key stream 中的 key 类型
        // ** KeyedStream泛型里面的第二个值是key,第一个值是元素类型
        //   2. 非广播流中的元素类型
        //   3. 广播流中的元素类型
        //   4. 结果的类型,在这里是 string
        DataStream<Tuple5<Integer, Long, Integer, String, String>> streamOperator = pidStream.connect(broadcast)
                .process(new KeyedBroadcastProcessFunction<Tuple, Person, Tuple2<Integer, Long>, Tuple5<Integer, Long, Integer, String, String>>() {

                    //定义map 状态来存储广播流数据
                    MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor;

                    //初始化state
                    //在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        mapStateDescriptor = new MapStateDescriptor<>("student", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {}));
                    }

                    // processBroadcastElement() 负责处理广播流的元素
                    @Override
                    public void processElement(Person person, ReadOnlyContext ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
                        // 处理每一个元素,看state是否有匹配的,有的话,下发到下一个节点
                        ReadOnlyBroadcastState<Integer, Tuple2<Integer, Long>> state = ctx.getBroadcastState(mapStateDescriptor);
                        if ((person.getPid() != null && state.get(person.getPid()) != null)) {
                            System.out.println("匹配到" + person.toString());
                            out.collect(new Tuple5<Integer, Long, Integer, String, String>(person.getPid(), state.get(person.getPid()).f1, person.getPage(), person.getPname(), person.getPsex()));
                        }
                    }

                    // processElement() 负责处理另一个流的元素
                    @Override
                    public void processBroadcastElement(Tuple2<Integer, Long> input, Context ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
                        // 新增加的广播元素,放入state中
                        //System.out.println("新增加需要监控的" + input.toString());
                        ctx.getBroadcastState(mapStateDescriptor).put(input.f0, input);
                    }
                });
        //打印结果
        streamOperator.print("broadcast");
        //执行程序
        environment.execute();
    }
}

在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
也就是描述了用于存储规则的和初始化state用于存储广播变量的名称一定要相同,不然就会报错。

The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(…) call if you forgot to register it.

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-19 01:14:20  更:2022-02-19 01:14:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:43:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码