一、简介
Flink提供的有状态可查询机制,可以通过第三方程序读取到flink内部的状态数据 就是flink计算之后的结果(state),允许通过第三方应用程序查询到
1.1应用场景
用户在进行登录前需要同过flink中获取到状态来判断本次登录是否有风险
说明:
二、架构
说明:
- 在Flink的状态可查询的架构中,存在三个基本概念
QueryableStateClient:第三方程序,不是flink架构中的内容 QueryableStateClientProxy:flink架构中的一部分,用来处理客户端的请求 QueryableStateServer:flink架构中的一部分,查询状态服务端(可查询的状态都在这里面) - flink状态可查询的执行
客户端发送状态可查询请求给taskManager中的QueryableStateClientProxy 通过key查询对应的状态数据 queryableStateClientProxy根据key到jobManager中获取到这个key对应的状态存储在哪个 taskmanager上面 根据key到指定的taskmanager上面的queryableStateServer中获取到这个key对应的状态
三、具体的场景使用
这里以wordCount为基础完成状态的第三方查询功能 步骤:
- 由于Flink中默认的状态可查询时关闭的,所以使用之前需要进行开启
- 在配置文件中开启状态可查询
- 将opt目录下的状态可查询说依赖的jar包放到lib目录下面
- 重启flink,并在启动日志中查看
3.1 启动状态可查询
1.修改配置文件
queryable-state.enable: true
2.复制jia依赖到lib目录下
[root@flink flink-1.10.0]# pwd
/opt/install/flink-1.10.0
[root@flink flink-1.10.0]# cp opt/flink-queryable-state-runtime_2.11-1.10.0.jar lib
3.设置状态可查询,两种方式让state在外部系统中可查询
- 创建QueryableStateStream,该Stream只是充当一个sink,将数据存储到queryablestate中
- 通过stateDescriptor.setQueryable(String queryableStateName)方法,将state可查
4.在代码中具体实现 Queryable State Stream: 通过KeyedStream对象的asQueryableState(stateName, stateDescriptor)方法,可以得到一个 QueryableStateStream对象,这个对象提供的状态值是可查询的
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
QueryableStateStream asQueryableState(String queryableStateName)
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
QueryableStateStream asQueryableState(String queryableStateName,
ReducingStateDescriptor stateDescriptor)
返回的QueryableStateStream可视为sink,无法进一步转换。在内部,将QueryableStateStream转换为一个 operator,这个operator将所有传入记录用来更新queryable state实例。更新逻辑在调用asQueryableState方法 时传递的StateDescriptor参数对象中完成。在如下程序中,Keyed Stream的所有记录在底层都是通过value state.update(value)更新状态实例:
stream.keyBy(0).asQueryableState("query-name")
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.
_
object WordCountQueryableState {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment.socketTextStream("flink.baizhiedu.com"
,9999)
var reducingStateDescriptor=new ReducingStateDescriptor[(String,Int)]
("reducingStateDescriptor"
,new ReduceFunction[(String,Int)] {
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int)
= {
(value1.
_
1,(value1.
_
2+value2.
_
2))
}
},createTypeInformation[(String,Int)])
dataStream.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.asQueryableState("wordCountqueryableStateName"
,reducingStateDescriptor)
environment.execute("wordCountQueryableStateJob")
}
}
Managed Keyed State 可以通过StateDescriptor.setQueryable(String queryableStateName)方法实现managed keyed State状态可查询
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.
_
class MyMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
var valueState:ValueState[Int]=
_
override def open(parameters: Configuration): Unit = {
val runtimeContext = getRuntimeContext
var valueStateDescriptor=new ValueStateDescriptor[Int]
("valueStateDescriptor"
,createTypeInformation[Int])
valueStateDescriptor.setQueryable("WordCountQueryableStateManagedKeyedStateName")
valueState=runtimeContext.getState(valueStateDescriptor)
}
override def map(value: (String, Int)): (String, Int) = {
val oldValue = valueState.value()
var newValue = valueState.update(oldValue+value._2)
(value._1,valueState.value())
}
}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.
_
object WordCountQueryableStateManagedKeyedState {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment.socketTextStream("flink.baizhiedu.com"
,9999)
dataStream.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new MyMapFunction)
.print()
environment.execute("WordCountQueryableStateManagedKeyedState")
}
}
Querying State 1.引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.10.0</version>
</dependency>
package com.baizhi.flink.queryablestate
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object Teest {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val result: DataStream[String] = keyedStream.map(new MyQueryableStateMapFunction)
result.print()
environment.execute("queryableStateJob")
}
}
class MyQueryableStateMapFunction extends RichMapFunction[(String, Int),String]{
var valueState:ValueState[Int]=_
override def open(parameters: Configuration): Unit = {
var valueStateDescriptor:ValueStateDescriptor[Int]=new ValueStateDescriptor[Int]("VSD",createTypeInformation[Int])
valueStateDescriptor.setQueryable("suibianxie")
valueState=getRuntimeContext.getState(valueStateDescriptor)
}
override def map(value: (String, Int)): String = {
val oldCOunt: Int = valueState.value()
valueState.update(oldCOunt+1)
value._1+"个数是"+valueState.value()
}
}
package com.baizhi.flink.queryablestate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
public class Test {
public static void main(String[] args) throws Exception {
QueryableStateClient queryableStateClient = new QueryableStateClient("hadoop10",9069);
JobID jobId=JobID.fromHexString("88c93ab332f0c16023a9c57b4f1718f1");
String queryableStateName="suibianxie";
String key="a";
TypeInformation<String> keyTypeInfo=TypeInformation.of(String.class);
ValueStateDescriptor<Integer> stateDescriptor=new ValueStateDescriptor<Integer>("vsd",TypeInformation.of(Integer.class));
CompletableFuture<ValueState<Integer>> future = queryableStateClient.getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
ValueState<Integer> valueState = future.get();
Integer count = valueState.value();
System.out.println(count);
}
}
|