目录
1 Flink 中的状态
2 算子状态(Operator State)
?2.1 算子状态类型
3 键控状态(keyed State)
?3.1 键控状态数据结构
3.2 键控状态使用
3.3? 实例
3.3.1 第一种实现方式
3.3.2 第二种实现方式
1 Flink 中的状态
- 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
- 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问,(我们自己定义的简单的本地变量是不可以的,因为flink是分布式的,要保证状态一致性需要序列化反序列化,flink底层已经实现一套机制。)
- Flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以便开发人员专注于应用程序的逻辑
- ?其实对于一些简单的算子比如map filter 可以没有状态,一些复查的算子比如reduce,比如窗口计算或者一些聚合计算需要有状态。flink 计算结果的数据实际上就是依赖2份数据了,第一份就是新输入的数据,第二份就是上一次计算过得保存成一个变量的数据(这个就是状态)
在flink中,状态始终与特定算子相关联,
为了使运行的flink了解算子的状态,算子需要预先注册其状态
总的来说有两种类型状态:
算子状态(operator State)
? ? ? 算子状态的作用范围限定为算子任务
键控状态(keyed State)
? ? ?根据输入数据流中的定义的键(key)来维护和访问
2 算子状态(Operator State)
- 算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态。
- 状态对同一子任务而言是共享的,也就是如上图上面一个task1里的所有数据都可以访问上面一个task1的状态。
- 算子啊状态不能有相同或者不同算子的另一个子任务访问。 也就是如上图上面一个task1的子任务的不能访问下面task子任务的状态,反之亦然。
?2.1 算子状态类型
① 列表状态(list state)
将状态表示为一组数据的列表
② 联合列表状态(Union list state)
也将状态表示为数据的列表,它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序是如何恢复
③ 广播状态(brodcast state)
如果一个算子有多个任务,而他的每个任务的状态又都相同,那么这种特殊情况最适合使用广播状态。
3 键控状态(keyed State)
重点使用
- 键控状态是根据输入数据流中定义的键来维护和访问的。
- flink 为每一个key维护一个状态实例,并将具有相同键的所有数据,都分区到相同一个算子任务中,这个任务会维护和处理这个key对应的状态
- 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。
?3.1 键控状态数据结构
① 值状态(value state)
将状态表示成单个的值
② 列表状态(list state)
将状态表示成一组数据列表
③ 映射状态(map state)
将状态表示为一组key-value对
聚合状态(reduce state & aggregating state)
将状态表示为一个用于集合操作的列表
3.2 键控状态使用
从上面使用状态过程可以看出,使用的过程中使用了运行时上下文,那么运行时上下文,在RichFunction是可以获取运行时上下文的,键控状态的使用位置也是在RichFunction内使用的,这是前提
举例测试代码
package com.study.liucf.unbounded.state
import java.util.Map
import java.{lang, util}
import com.study.liucf.bean.LiucfSensorReding
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author liucf
* @Date 2021/10/13
* 状态编程
*/
object LiucfStateTest {
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
/**输出结果,打印到控制台*/
transRes.print("result")
/**启动流式处理*/
env.execute(" liucf watermark test")
}
}
/**
* 注意keyed state 必须定义和使用在RichFunction里,因为要使用运行时上下文
*/
class LiucfRichMapFunction extends RichMapFunction[LiucfSensorReding,Tuple2[String,Double]]{
/**① ValueState 定义,这里先声明,然后再声明周期函数open 里定义*/
var valueState:ValueState[Double] = _
/**② ListState 这里使用lazy 方式定义,在真正使用的时候才会去定义,所以一定是生命周期开始以后调用,此时运行时上下文已经被实例化了*/
lazy val listState:ListState[Long] = getRuntimeContext.getListState(new ListStateDescriptor("listState",classOf[Long]))
/**③ MapState*/
lazy val mapState:MapState[String,LiucfSensorReding] = getRuntimeContext.getMapState(new MapStateDescriptor("mapState",classOf[String],classOf[LiucfSensorReding]))
/**④ ReduceState*/
lazy val reduceState:ReducingState[LiucfSensorReding] = getRuntimeContext.getReducingState(new ReducingStateDescriptor("reduceState",new LiucfReduce(),classOf[LiucfSensorReding]))
override def open(parameters: Configuration): Unit = {
getRuntimeContext.getState(new ValueStateDescriptor("valueState",classOf[Double]))
}
override def map(value: LiucfSensorReding): (String, Double) = {
/**① 使用 valuestate*/
val valueStateV = valueState.value() // 取出valuestate 值
valueState.update(value.temperature) // 更新valuestate 值
/**② 使用 ListState*/
val listStateIterable = listState.get() // 取出listState列表
val lst: util.ArrayList[Long]= new util.ArrayList()
lst.add(10000L)
lst.add(2000L)
listState.addAll(lst) // 更新listState-追加一个列表进入原状态列表
listState.update(lst) // 更新listState-覆盖更新一个列表进入状态列表
listState.add(value.timestamp) // 更新listState-追加1个元素进入状态列表
/**③ 使用 MapState*/
mapState.contains(value.id) // 判断key中是否包含
val iterable: lang.Iterable[Map.Entry[String, LiucfSensorReding]] = mapState.entries() // 取出valuestate 整个集合
val reding: LiucfSensorReding = mapState.get(value.id) // 取出valuestate ke-value中的value
mapState.put(value.id,value) // 更新mapState-追加1个元素进入状态列表
/**④ 使用 ReduceState*/
val liucfSensorReding = reduceState.get() // 取出reduceState 值
reduceState.add(value) // 更新reduceState 值
(value.id,value.timestamp)
}
}
3.3? 实例
需求:
? ? ?每个传感器采集到的温度前5条记录求平均值,第六条开始,如果采集到的温度与之前记录的平均值的差不大于10度,则该条采集记录参与平均值重新计算;如果采集到的温度与之前记录的平均值的差大于等于10度,则进行抖动过大告警,且这条采集记录不参与平均值计算。
3.3.1 第一种实现方式
package com.study.liucf.unbounded.state
import java.util.Map
import java.{lang, util}
import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.{ReduceFunction, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* @Author liucf
* @Date 2021/10/13
* 状态编程
* 需求:
* 每个传感器采集到的温度前5条记录求平均值,第六条开始,如果采集到的温度与之前记录的平均值的差不大于10度,
* 则该条采集记录参与平均值重新计算;如果采集到的温度与之前记录的平均值的差大于等于10度,则进行抖动过大告警,
* 且这条采集记录不参与平均值计算。
*/
object LiucfStateDemo1 {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorRedingAgg(arr(0),arr(1).toLong,arr(2).toDouble,0.0)
})
.keyBy(_.id)
.flatMap(new LiucfFlatMapRichFunction(EXAMPLE_SIZE))
/**输出结果,打印到控制台*/
transRes.print("result")
/**启动流式处理*/
env.execute(" liucf watermark test")
}
}
class LiucfFlatMapRichFunction(exampleSize:Long) extends RichFlatMapFunction[LiucfSensorRedingAgg,LiucfSensorRedingAgg]{
/**声明一个值状态*/
lazy val sensorCounterValueState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("sensorCounterValueState",classOf[Long]))
/**声明一个reduce状态*/
lazy val sensorReduceState:ReducingState[LiucfSensorRedingAgg] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[LiucfSensorRedingAgg]("sensorReduceState",new LiucfReduceFunction(),classOf[LiucfSensorRedingAgg]))
override def flatMap(value: LiucfSensorRedingAgg, out: Collector[LiucfSensorRedingAgg]): Unit = {
/**获取当前的值状态值*/
var sensorCounter = sensorCounterValueState.value()
/**逻辑处理分为采样期和告警期两个分支*/
if(sensorCounter<=exampleSize){//前exampleSize条采集记录只做为采样: 计数和reduce相加,不做告警
sensorCounter += 1 // 新采集的数据到来是,当前计数的状态值+1
sensorCounterValueState.update(sensorCounter)//更新状态值为+1后的值
if(null == sensorReduceState.get()){ //第一条采集的数据到达时因为是初始的reducestate 因此状态值时null,需要特殊处理
/**第一条数据到达时初始化reduce状态值*/
val sensor = LiucfSensorRedingAgg(value.id,value.timestamp,value.temperature,value.temperature)
sensorReduceState.add(sensor)
}else{// 不是当前传感器的采集的第一条数据时,把新到达的数据传入reduce,然后就会和之前的状态值记录的值进行聚合操作。
sensorReduceState.add(value)
}
}else{ // 采样已经结束,进入告警期,然后开始安装规则判断告警和计算
/**获取当前reduce状态值,也就是能拿到之前到达的满足条件的温度的总和*/
val aggTemperature = sensorReduceState.get().target
/**之前到达的满足条件的温度的总和与采集到的满足条件的温度数据条数做除法求平均温度*/
val avgTemperature = aggTemperature / sensorCounter
if((avgTemperature-value.temperature).abs>5){
/**波动超过5度进行告警,输出当前采集的温度数据和当前的平均温度值*/
out.collect(LiucfSensorRedingAgg(value.id,value.timestamp,value.temperature,avgTemperature))
}else{
/**不满足告警条件的重新计算温度总和和采集到的条数(不包含告警的那条)*/
sensorReduceState.add(value)
sensorCounter += 1
sensorCounterValueState.update(sensorCounter)
}
}
}
}
/**自定义reduce*/
class LiucfReduceFunction() extends ReduceFunction[LiucfSensorRedingAgg]{
/**
* 两个元素进行reduce
* @param value1 上一次计算的结果,对与reducestates使用时表示当前状态
* @param value2 这次新传入的数据,新采集到的温度数据
* @return 返回本次计算的结果,reducestate状态值和新采集到的温度数据进行聚合处理后作为新的状态被保存,之前的状态被更新成最新的。
*/
override def reduce(value1: LiucfSensorRedingAgg, value2: LiucfSensorRedingAgg): LiucfSensorRedingAgg = {
val newReduceStateValue = LiucfSensorRedingAgg(value1.id,value2.timestamp,value2.temperature,value1.target+value2.temperature)
printf("Former state: %s,new date: %s,new state:%s \n",value1,value2,newReduceStateValue)
newReduceStateValue
}
}
测试输入:
? nc -l 9999
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,10
sensor_4,1630851514,45
测试输出:
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,40.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,80.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,80.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,120.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,120.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,160.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,160.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,200.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,200.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,240.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,240.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,280.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,280.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,320.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,320.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,360.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,360.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,40.0,400.0)
result> LiucfSensorRedingAgg(sensor_4,1630851514,10.0,40.0)
Former state: LiucfSensorRedingAgg(sensor_4,1630851514,40.0,400.0),new date: LiucfSensorRedingAgg(sensor_4,1630851514,45.0,0.0),new state:LiucfSensorRedingAgg(sensor_4,1630851514,45.0,445.0)
3.3.2 第二种实现方式
package com.study.liucf.unbounded.state
import java.util.Map
import java.{lang, util}
import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import com.study.liucf.unbounded.transform.LiucfReduce
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author liucf
* @Date 2021/10/13
* 状态编程
*/
object LiucfStateDemo2 {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy(_.id)
.flatMapWithState[LiucfSensorRedingAgg,(Long,Double)]({//[LiucfSensorRedingAgg,(Long,Double)] 表示 [返回值类型,状态类型]
//(输入数据,状态类型) => (返回数据:第一条数据不返回,状态数据)
case (data:LiucfSensorReding,None)=>(List.empty,Some((1,data.temperature)))
//(输入数据,状态类型) => {处理过程,返回结果}
case (data:LiucfSensorReding,lastState:Some[(Long,Double)])=>{
val lastStatValue = lastState.get
if(lastStatValue._1<=EXAMPLE_SIZE){//EXAMPLE_SIZE次以内采集期:不输出告警,状态里的采集条数和温度总和进行统计
(List.empty,Some((lastStatValue._1+1,lastStatValue._2+data.temperature)))
}else{//告警期
val temperatureAvg = lastStatValue._2/lastStatValue._1 //平均温度
if((data.temperature-temperatureAvg).abs>5){
//满足条件输出告警,当前状态保持和上一次一样不变
(List(LiucfSensorRedingAgg(data.id,data.timestamp,data.temperature,temperatureAvg)),lastState)
}else{
//不满足告警条件,继续求温度和与采集的条数
(List.empty,Some((lastStatValue._1+1,lastStatValue._2+data.temperature)))
}
}
}
})
/**输出结果,打印到控制台*/
transRes.print("result")
/**启动流式处理*/
env.execute(" liucf watermark test")
}
}
测试输入:
? nc -l 9999
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,40
sensor_4,1630851514,10
sensor_4,1630851514,45
测试输出:
result> LiucfSensorRedingAgg(sensor_4,1630851514,10.0,40.0)
可见方式一和方式二结果是一样的。
|