什么是Queryable State
简而言之,就是Flink将managed keyed(partitioned)state暴露给外部,从而用户可以从Flink外部访问state作业。
架构
1.QueryableStateClient,默认运行在Flink集群的外部,负责提交用户的查询请求。 2.QueryableStateClientProxy,运行在TaskManager上(Flink集群的内部),负责接收客户端的查询请求,从所负责的Task Manager获取请求的state,并返回给客户端。 3.QueryableStateServer,运行在Task Manager上,负责本地存储的state。
客户端连接到一个代理,获取特定的key对应的state,key state会按照key划分key groups,每个task manager会分配一些key groups。代理会询问job manager以找到key所属的key groups所在的task manager,根据返回的结果,代理将会运行在Task Manager上的QueryableStste Server查询key对应的state,并将结果返回给客户端。
如何使用Queryable state
1.将 flink-queryable-state-runtime_2.11-1.13.0.jar从Flink distribution的opt/目录拷贝到lib/目录 2.将参数queryable- state.enable设置为true,详细信息参考配置文档Configuration 验证集群queryable state已经被激活,可以检查任意task manager的日志中是否包含“Started the Queryable State Proxy Server @ …"。
将state设置为可查询的
激活集群的queryable state功能后,还要讲state设置为可查询的才能对外可见,有两种设置方式:
- 创建一个QueryStateStream,他会作为一个sink,并将输入数据转换为queryable state;
- 通过stateDescriptor.setQueryable(String queryableStateName)将state描述符所表示的keyed state设置可查询的。
第一种方式:Query State Stream
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor
)
QueryableState as QueryableState(String queryableStateName)
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor
)
注意:没有可查询的ListState sink,因为这种情况下list会不断增长,并且可能不会被清理,最终会消耗大量的内存。 返回的QueryableStateStream被看作是一个sink,不可再进一步转换。在内部实现上,一个QueryableStateStream被转成一个operator,使用输入的数据来更新queryable state。state如何更新是由asQueryableState提供的StateDescriptor来决定。下面的代码中,keyed strean的所有数据将会通过ValueState.update(value)来更新状态:
stream.KeyBy(value -> value.f0).asQueryableState("query-name")
第二种方式:Managed Keyed State operator中Managed keyed state可以通过StateDescriptor.setQueryable(String queryableStateName)将State Descriptor将state descriptor设置成可查询的,从而使state可查询
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
descriptor.setQueryable("query-name");
参数queryStateName的名称可以任意选取,并且只被用来进行查询,可以和state名称不同。 这种不会限制state类型,即任意的ValueState、ReduceState、ListState、MapState、AggreatingState以及已弃用的FlodingStste均可作为queryable state。 查询state 目前为止,你已经激活了集群的queryable state功能,并且将一些state设置成可查询的,接下来将会展示会如何进行查询。 为了进行查询,可以使用辅助类QueryableStateClient,这个类在flink-queryable-state-client的jar中,在项目的pom文件要显示添加对flink-queryable-state-client和flink-core的依赖,如下所示: xml org.apache.flink flink-core 1.13.0 org.apache.flink flink-queryable-state-client-java 1.13.0 关于依赖的信息,可参考配置Flink项目 QueryableStateClient将提交你的请求到内部代理,代理会处理请求并返回结果。客户端初始化只需要提供一个有效的Task Manager主机名(每个task manager上都运行一个queryable state代理),以及代理监听的端口号,关于配置端口号和代理可参考Configuration Section
QueryableStateClient client = new QueryableStateClient(tmHostname, pproxyPort);
客户端创建完成后,查询类型为k的key,以及类型为V的state,可以使用以下方法:
CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeInformation<K> keyTypeInfo,
StateDescriptor<S, V> stateDescriptor
)
该方法会返回一个最终将包含state的query state实例,该实例可通过JobID和queryableStateName识别。在方法参数中,key用来指定所要查询的状态所属的key。keyTypeInfo告诉Flink对key进行序列化和反序列化。StateDescriptor包含了所请求state的必要信息,即state类型(Value,Reduce),即state的类型(Value, Reduce等等),以及如何对其进行序列和反序列。 细心的读者会注意到返回的future包含类型为S的值,即一个存储实际值的state对象。他可以是Flink支持的任何类型的state:ValueState、Reduce State、MapState、ListState、MapState、AggreatingState以及弃用的FloadingState。
注意:这些state对象不允许对其中的state进行修改。你可以通过valueState.get()获取实际的state,或者通过mapState.entries()遍历所有的值,但是不能修改他们。举例来说,对返回的list state调用add()方法会导致UnsupporedOperationException。 注意:客户端是异步的,并且可能被多个线程共享。客户端不再使用后需要通过QueryableStateClient.shutdown()来终止,从而释放资源。
public class WordCount{
public void main(String[] args) throws Exception{
StreamExceptionEnvironment env = StreamExecuptionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setStateBackend(new RocksDBStateBaxkend("hdfs路径"));
env.socketTextStream("IP", 9999)
.flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>(){
@Override
public void flatMap(String s, Collector<Tuple2<Long, Long>> collector) throws Exception {
collector.collect(new Tuple2<>(Long.parseLong(s), 1L));
}
}).KeyBy(0)
.flatMap(Wordnew CountWindowAverage())
.KeyBy(0)
.print();
env.execute(WordCount.class.getCanonicalName());
}
static class CountWindowAverage extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>{
@Override
public void open(Configuration config){
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){});
descriptor.setQueryable("query123");
sum = getRuntimeContext().getState(descriptor);
}
@override
public void flatmap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception{
Tuple2<Long, Long> currentSum = sum.value();
if(currentSum == null) {
currentSum = new Tuple2<>(0L, 1L);
}
currentSum.f0 += 1;
currentSum.f1 += 1;
sum.update(currentSum);
if(currentSum.f0 >= 2){
out.collect(new Tuple2<>(input.f0, currentSum.f1));
}
}
}
}
|