IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink state(3)- Queryable State -> 正文阅读

[大数据]Flink state(3)- Queryable State

什么是Queryable State

简而言之,就是Flink将managed keyed(partitioned)state暴露给外部,从而用户可以从Flink外部访问state作业。

架构

1.QueryableStateClient,默认运行在Flink集群的外部,负责提交用户的查询请求。
2.QueryableStateClientProxy,运行在TaskManager上(Flink集群的内部),负责接收客户端的查询请求,从所负责的Task Manager获取请求的state,并返回给客户端。
3.QueryableStateServer,运行在Task Manager上,负责本地存储的state。

客户端连接到一个代理,获取特定的key对应的state,key state会按照key划分key groups,每个task manager会分配一些key groups。代理会询问job manager以找到key所属的key groups所在的task manager,根据返回的结果,代理将会运行在Task Manager上的QueryableStste Server查询key对应的state,并将结果返回给客户端。

如何使用Queryable state

1.将 flink-queryable-state-runtime_2.11-1.13.0.jar从Flink distribution的opt/目录拷贝到lib/目录
2.将参数queryable- state.enable设置为true,详细信息参考配置文档Configuration
验证集群queryable state已经被激活,可以检查任意task manager的日志中是否包含“Started the Queryable State Proxy Server @ …"。

将state设置为可查询的

激活集群的queryable state功能后,还要讲state设置为可查询的才能对外可见,有两种设置方式:

  • 创建一个QueryStateStream,他会作为一个sink,并将输入数据转换为queryable state;
  • 通过stateDescriptor.setQueryable(String queryableStateName)将state描述符所表示的keyed state设置可查询的。
    第一种方式:Query State Stream
// ValueState
QueryableStateStream asQueryableState(
	String queryableStateName,
	ValueStateDescriptor stateDescriptor
)

// 显式ValueStateDescriptor变量快捷方式
QueryableState as QueryableState(String queryableStateName)

// ReducingState
QueryableStateStream asQueryableState(
	String queryableStateName,
	ReducingStateDescriptor stateDescriptor
)

注意:没有可查询的ListState sink,因为这种情况下list会不断增长,并且可能不会被清理,最终会消耗大量的内存。
返回的QueryableStateStream被看作是一个sink,不可再进一步转换。在内部实现上,一个QueryableStateStream被转成一个operator,使用输入的数据来更新queryable state。state如何更新是由asQueryableState提供的StateDescriptor来决定。下面的代码中,keyed strean的所有数据将会通过ValueState.update(value)来更新状态:

stream.KeyBy(value -> value.f0).asQueryableState("query-name")

第二种方式:Managed Keyed State
operator中Managed keyed state可以通过StateDescriptor.setQueryable(String queryableStateName)将State Descriptor将state descriptor设置成可查询的,从而使state可查询

ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
	"average", // state的名字
	TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); // type information
	descriptor.setQueryable("query-name"); // query state name

参数queryStateName的名称可以任意选取,并且只被用来进行查询,可以和state名称不同。
这种不会限制state类型,即任意的ValueState、ReduceState、ListState、MapState、AggreatingState以及已弃用的FlodingStste均可作为queryable state。
查询state
目前为止,你已经激活了集群的queryable state功能,并且将一些state设置成可查询的,接下来将会展示会如何进行查询。
为了进行查询,可以使用辅助类QueryableStateClient,这个类在flink-queryable-state-client的jar中,在项目的pom文件要显示添加对flink-queryable-state-client和flink-core的依赖,如下所示:
xml org.apache.flink flink-core 1.13.0 org.apache.flink flink-queryable-state-client-java 1.13.0
关于依赖的信息,可参考配置Flink项目
QueryableStateClient将提交你的请求到内部代理,代理会处理请求并返回结果。客户端初始化只需要提供一个有效的Task Manager主机名(每个task manager上都运行一个queryable state代理),以及代理监听的端口号,关于配置端口号和代理可参考Configuration Section

QueryableStateClient client = new QueryableStateClient(tmHostname, pproxyPort);

客户端创建完成后,查询类型为k的key,以及类型为V的state,可以使用以下方法:

CompletableFuture<S> getKvState(
	JobID jobId,
	String queryableStateName,
	K key,
	TypeInformation<K> keyTypeInfo,
	StateDescriptor<S, V> stateDescriptor
)

该方法会返回一个最终将包含state的query state实例,该实例可通过JobID和queryableStateName识别。在方法参数中,key用来指定所要查询的状态所属的key。keyTypeInfo告诉Flink对key进行序列化和反序列化。StateDescriptor包含了所请求state的必要信息,即state类型(Value,Reduce),即state的类型(Value, Reduce等等),以及如何对其进行序列和反序列。
细心的读者会注意到返回的future包含类型为S的值,即一个存储实际值的state对象。他可以是Flink支持的任何类型的state:ValueState、Reduce State、MapState、ListState、MapState、AggreatingState以及弃用的FloadingState。

注意:这些state对象不允许对其中的state进行修改。你可以通过valueState.get()获取实际的state,或者通过mapState.entries()遍历所有的值,但是不能修改他们。举例来说,对返回的list state调用add()方法会导致UnsupporedOperationException。
注意:客户端是异步的,并且可能被多个线程共享。客户端不再使用后需要通过QueryableStateClient.shutdown()来终止,从而释放资源。

public class WordCount{
	public void main(String[] args) throws Exception{
		StreamExceptionEnvironment env = StreamExecuptionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(5000);
		env.setStateBackend(new RocksDBStateBaxkend("hdfs路径"));
		env.socketTextStream("IP", 9999)
		   .flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>(){
		   @Override
		   public void flatMap(String s, Collector<Tuple2<Long, Long>> collector) throws Exception {
		   		collector.collect(new Tuple2<>(Long.parseLong(s), 1L));
		   }
		}).KeyBy(0)
		  .flatMap(Wordnew CountWindowAverage())
		  .KeyBy(0)
		  .print();
		env.execute(WordCount.class.getCanonicalName());
	}
	static class CountWindowAverage extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>{
		@Override
		public void open(Configuration config){
			ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
				"average", // state name
				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}); // typeInfo
				descriptor.setQueryable("query123");
				sum = getRuntimeContext().getState(descriptor);
		}

		@override
		public void flatmap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception{
			Tuple2<Long, Long> currentSum = sum.value();
			if(currentSum == null) {
				currentSum = new Tuple2<>(0L, 1L);
			}

			currentSum.f0 += 1;
			currentSum.f1 += 1;
			sum.update(currentSum);

			if(currentSum.f0 >= 2){
				out.collect(new Tuple2<>(input.f0, currentSum.f1));
			}
		}
	}
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-12 23:31:26  更:2021-10-12 23:32:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 1:21:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码