IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink小知识--State Processor API的简单讲解(2) State的Writer -> 正文阅读

[大数据]Flink小知识--State Processor API的简单讲解(2) State的Writer

在上一章节中,我简单介绍了State 的读取操作
Flink小知识–State Processor API的简单讲解(1) State的读取
本章节将重点简述下 state 的写以及修改,主要以 Keyed State为例
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

1. Writing New Savepoints

基于上一期的key state 案例,本期生成的state 数据结构将与上一期一致

  • Keyed State writer
{
		ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        
        String oldsavepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-1";
        String savepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-3";
        int maxParallelism = 128;
        
		//测试离线数据demo
        DataSource<Tuple2<String,Integer>> fromCollection = bEnv.fromCollection(Arrays.asList(
                    Tuple2.of("a", 10),
                    Tuple2.of("b", 10),
                    Tuple2.of("d", 10)
                )
            );
            
		//依据离线数据获得最终 key 结果 
        BootstrapTransformation<Tuple2<String, Integer>> transform = OperatorTransformation
                .bootstrapWith(fromCollection)
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .transform(new keyBootstrapper());
                
         //创建新的savepoint 用于追数 注意并行度的选择
        Savepoint.create(new MemoryStateBackend(),maxParallelism)
                .withOperator("key_uid", transform)
                .write(savepointPath);

        //获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint
//        Savepoint.load(bEnv,oldsavepointPath,new MemoryStateBackend())
//                .withOperator("key_uid2", transform)
//                .write(savepointPath);

        bEnv.execute();
}
   ?// 以下代码逻辑与执行作业中的 代码逻辑一致
    public static class keyBootstrapper extends KeyedStateBootstrapFunction<String, Tuple2<String,Integer>> {
        ValueState<Integer> state;
        ListState<Long> updateTimes;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);

            ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
            updateTimes = getRuntimeContext().getListState(updateDescriptor);
        }

        @Override
        public void processElement(Tuple2<String,Integer>  value, Context ctx) throws Exception {
            state.update(value.f1 + 1);
            updateTimes.add(System.currentTimeMillis());
              //可以注册processtime eventtime timer
//            ctx.timerService().registerEventTimeTimer(1000);
        }
    }
  • 读取结果验证结果集

在savepointPath 目录中新生成_metaata

KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}

2. Modifying Savepoints

获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint

  //获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint
        Savepoint.load(bEnv,oldsavepointPath,new MemoryStateBackend())
                .withOperator("key_uid2", transform)
                .write(savepointPath);
  • 结果
 // uid key_uid
KeyedStates{key='a', value=2, times=[1626157285704]}
KeyedStates{key='d', value=2, times=[1626157292202]}
KeyedStates{key='c', value=2, times=[1626157289201]}
 // uid key_uid2
KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 10:59:11  更:2021-07-14 11:01:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 9:50:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码