IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink中基于Operator State 的计算开发方法 -> 正文阅读

[大数据]Flink中基于Operator State 的计算开发方法

文:王东阳

前言

在Flink中根据数据集是否根据Key进行分区,将状态分为Keyed StateOperator State(Non-keyed State)两种类型 ,在之前的文章《Flink中基于KeyedState的计算开发方法》已经详细介绍了Keyed State的概念和用法,本文将继续介绍Operator State

Operator StateKeyed State不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State支持当算子实例并行度发生变化时自动重新分配状态数据, OperatorState目前只支持使用ListState。

Operator State与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka消费端算子实例都对应到Kafka的一个分区中,维护Topic分区和Offsets偏移量作为算子的Operator State 在Flink中可以通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable>两个接口来定义操作Operator State的函数。

Operator State开发实战

本章节将通过实际的项目代码演示Operator State在两种不同计算场景下的开发方法。

在样例中将演示Operator State如何融合进入Flink 的DataStream API,让用户在开发Flink应用的时候,可以将临时数据保存在State中,从State中读取数据,在运行的时候,在运行层面上与算子、Function体系融合,自动对State进行备份Checkpoint,一旦出现异常能够从保存的State中恢复状态,实现Exactly-Once 。

通过CheckpointedFunction接口操作Operator State

CheckpointedFunction接口定义如代码所示,需要实现两个方法,当checkpoint触发时就会调用snapshotState()方法,当初始化自定义函数的时候会调用initializeState()方法,其中包括第一次初始化函数和从之前的checkpoints中恢复状态数据,同时initializeState()方法中需要包含两套逻辑,

  • 一个是不同类型状态数据初始化的逻辑,
  • 一个是从之前的状态中恢复数据的逻辑
@Public
public interface CheckpointedFunction {
    // 每当 checkpoint 触发的时候 调用这个方法
  void snapshotState(FunctionSnapshotContext var1) throws Exception;

    // 每次 自定义函数初始化的时候 调用此方法初始化
  void initializeState(FunctionInitializationContext var1) throws Exception;
}

在每个算子中Managed Operator State都是以List形式存储,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持对Managed Operator State两种重分布的策略,分别是Even-split Redistribution和Union Redistribution。

  • Even-split Redistribution:每个算子实例中含有部分状态元素的List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 相同数量的List列表,每个task实例中有一个List,其可以为空或者含有多个元素
  • Union Redistribution:每个算子实例中含有所有状态元素的List列表,当触发 restore/redistribution 动作时,每个算子都能够获取到完整的状态元素列表

实现FlatMapFunction和CheckpointedFunction

在实际项目中可以通过实现FlatMapFunctionCheckpointedFunction完成对输入数据中每个key的数据元素数量和算子中的元素数量的统计。如代码所示,通过在initializeState()方法中分别创建keyedStateoperatorState两种State,存储基于Key相关的状态值以及基于算子的状态值。

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

public class CheckpointCount
    implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction {

  private static final Logger logger = Logger.getLogger(CheckpointCount.class);

  private Long operatorCount;
  private ValueState<Long> keyedState;
  private ListState<Long> operatorState;

  public void CheckpointCount() {

  }

  @Override
  public void flatMap(
      Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception {

    if (integerLongTuple2.f0 == 4) {
      throw new IOException("input ");
    }

    if (keyedState.value() == null) {
      keyedState.update(1L);
    } else {
      keyedState.update(keyedState.value() + 1L);
    }

    operatorCount = operatorCount + 1;
    //输出结果,包括id,id对应的数量统计keyedCount,算子输入数据的数量统计 operatorCount
    collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount));
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    System.out.println("snapshot");
    operatorState.clear();
    operatorState.add(operatorCount);
  }

  @Override
  public void initializeState(FunctionInitializationContext ctx) throws Exception {
    System.out.println("initialize");
    logger.debug("init");
    keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class));
    operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>(
        "operatorState",
        Long.class));
    operatorCount = 0L;

    if (ctx.isRestored()) {
      List<Long> op = Lists.newArrayList(operatorState.get());
      if (op.size() > 0 ) {
        operatorCount = op.get(op.size()-1);
      }
      System.out.println("restored");
    }
  }
}

代码地址:CheckpointCount.java

可以从上述代码中看到的是,在 snapshotState() 方法中清理掉上一次checkpoint中存储的operatorState的数据,然后再添加并更新本次算子中需要checkpointoperatorCount状态变量。当系统重启时会调用initializeState方法,重新恢复keyedStateoperatorState,其中operatorCount数据可以从最新的operatorState中恢复。

验证代码

构建验证代码如下:

  private static void checkpointOperatorStateWithMapFunction() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState
    // env.getCheckpointConfig().set

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {
        String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.keyBy(0).flatMap(new CheckpointCount()).print();
    env.execute("checkpoint state for map");
  }

代码地址:checkpointOperatorStateWithMapFunction

通过***nc***,我们输入以下数据

DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
3,1
3,2
3,3

得到打印输出

initialize
snapshot
snapshot
snapshot
(2,1,1)
(2,2,2)
(2,3,3)
snapshot
(3,1,4)
(3,2,5)
snapshot
(3,3,6)
snapshot

可以看到

  • 由于 keyedState 是跟key相关的,所以当integerLongTuple2.f0从2变为3的时候, keyedState 是重新初始化,从1开始递增
  • 由于 operatorState只跟算子相关的,所以一直在递增
  • 由于代码中使用 env.enableCheckpointing(3000) 开启了checkpoint,可以看到 snapshotState 中的日志打印出来

我们等(3,3,6)后面的snapshot打印出来后,接下来通过***nc***继续输入

 4,3

由于我们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异常,所以此刻flink程序就会退出,然后重启,进入到 initializeState

看到对应的打印如下

initialize
restored
snapshot
snapshot

可以看到初始化(initialize)以及恢复(restored)的逻辑都执行到了,我们通过nc继续输入 3,5 ,可以看到程序打印出 (3,4,7) , 说明 keyedState 以及operatorCount 都正常恢复了之前的值。

对于状态数据重分布策略的使用,可以在创建operatorState的过程中通过相应的方法指定:如果使用Even-split Redistribution
略,则通过context. getListState(descriptor)获取OperatorState;如果使用Union Redistribution策略,则通过context.getUnionList State(descriptor) 来获取。实例代码中默认使用的Even-split Redistribution策略。

通过CheckpointedFunction构建带缓冲区的Sink

下面我们再看另外一个例子,构建一个带缓冲的Sink。如代码所示,通过checkpointState保存当前已经接受的所有元素列表。

实现SinkFunction和CheckpointedFunction

  • snapshotState()方法中清理掉上一次checkpoint中存储的operatorState的数据,然后再添加本次算子中需要checkpoint的bufferedElements中的每一个元素。

  • 当系统重启时会调用initializeState方法,重新恢复operatorState,其中bufferedElements中的数据可以从 checkpointState 中恢复

  • invoke 会在算子收到每一个元素时调用

  • finish 会在算子收到上游所有元素后调用

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class BufferingSink
    implements SinkFunction<Tuple2<Integer, Long>>,
    CheckpointedFunction {

  private final int threshold;
  private transient ListState<Tuple2<Integer, Long>> checkpointState;
  private List<Tuple2<Integer, Long>> bufferedElements;

  public BufferingSink(int threshold){
    this.bufferedElements = new ArrayList<>();
    this.threshold = threshold;
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    System.out.println("snapshot");
    checkpointState.clear();
    // for (Tuple2<Integer, Long> element: bufferedElements) {
    //   checkpointState.add(element);
    // }
    checkpointState.addAll(bufferedElements);
  }

  @Override
  public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    System.out.println("initialize");
    ListStateDescriptor<Tuple2<Integer, Long>> descriptor =
        new ListStateDescriptor<Tuple2<Integer, Long>>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})
        );

    checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);

    if (functionInitializationContext.isRestored()) {
      for (Tuple2<Integer, Long> element : checkpointState.get()){
        bufferedElements.add(element);
      }
    }

  }

  @Override
  public void invoke(
      Tuple2<Integer, Long> value, Context context) throws Exception {
    // called for each element
    // SinkFunction.super.invoke(value, context);
    System.out.println(String.format("recv %d %d", value.f0, value.f1));
    bufferedElements.add(value);
    if (value.f0 == 4) {
      throw new IOException("input ");
    }

  }

  @Override
  public void finish() throws Exception {
    // called when every element has received
    // SinkFunction.super.finish();
    for (Tuple2<Integer, Long> element: bufferedElements) {
      System.out.println(element);
    }
  }
}

代码地址:CheckpointBufferingSink.java

验证代码

构建验证程序如下:

  private static void checkpointListStateWithSinkFunction() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(3000); // if commit this line off, only got initializeState

    DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
    SingleOutputStreamOperator<Tuple2<Integer, Long>>  inputStream= localhost.map(new MapFunction<String,
        Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> map(String s) throws Exception {
        String[] split = s.split(",");
        return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
      }
    });

    inputStream.addSink(new BufferingSink(2));
    env.execute("checkpoint state for sink");
  }

代码地址:checkpointListStateWithSinkFunction

通过 nc 输入如下数据

wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3

flink程序输出如下打印:

initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot

recv 2 3 之后看到 snapshot 打印出来后,用于确保最后一条(2,3)也保存到了checkpoint中,通过 nc 输入

4,0

由于我们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异常,所以此刻flink程序就会退出,然后重启,进入到 initializeState

flink程序输出如下打印:

recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot

可以看到 正确进入到initializeState 并且执行了恢复逻辑,接下来通过 nc 输入

2,4

flink程序输出如下打印

recv 2 4
snapshot

接下来 Ctrl+C 停止 nc ,进入到 finish()函数中,flink程序输出

(2,1)
(2,2)
(2,3)
(2,4)

标明flink异常退出重启后,正确从checkpointState恢复了之前的数据。

通过ListCheckpointed接口定义Operator State

ListCheckpointed接口和CheckpointedFunction接口相比在灵活性上相对弱一些,只能支持List类型的状态,并且在数据恢复的时候仅支持even-redistribution策略。在ListCheckpointed接口中需要实现以下两个方法来操作Operator State:

public interface ListCheckpointed<T extends Serializable> {
  List<T> snapshotState(long var1, long var3) throws Exception;

  void restoreState(List<T> var1) throws Exception;
}

其中snapshotState方法定义数据元素List存储到checkpoints的逻辑,restoreState方法则定义从checkpoints中恢复状态的逻辑。如果状态数据不支持List形式,则可以在snapshotState方法中返回Collections.singletonList(STATE)

这个接口在Flink 1.14中已经不建议使用,所以本文也不再进行实例演示。

总结

本文介绍了OperateStateMapFunction以及SlinkFunction两种操作场景中的应用,同时展示了如何通过结合CheckpointedFunction自动对State进行备份Checkpoint,从而在任务出现异常时能够从保存的State中恢复状态,实现Exactly-Once。

参考资料

  • 《Flink原理、实战与性能优化》5.1 有状态计算
  • 《Flink内核原理与实现》第7章 状态原理
  • Flink教程(17) Keyed State状态管理之AggregatingState使用案例 求平均值 https://blog.csdn.net/winterking3/article/details/115133519
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-24 00:37:48  更:2022-03-24 00:41:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 16:09:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码