IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink(九) 状态可查询 Queryable State -> 正文阅读

[大数据]Flink(九) 状态可查询 Queryable State

一、简介

Flink提供的有状态可查询机制,可以通过第三方程序读取到flink内部的状态数据
就是flink计算之后的结果(state),允许通过第三方应用程序查询到

1.1应用场景

用户在进行登录前需要同过flink中获取到状态来判断本次登录是否有风险

在这里插入图片描述
说明:
在这里插入图片描述

二、架构

在这里插入图片描述
说明:

  1. 在Flink的状态可查询的架构中,存在三个基本概念
    QueryableStateClient:第三方程序,不是flink架构中的内容
    QueryableStateClientProxy:flink架构中的一部分,用来处理客户端的请求
    QueryableStateServer:flink架构中的一部分,查询状态服务端(可查询的状态都在这里面)
  2. flink状态可查询的执行
    客户端发送状态可查询请求给taskManager中的QueryableStateClientProxy
    通过key查询对应的状态数据
    queryableStateClientProxy根据key到jobManager中获取到这个key对应的状态存储在哪个
    taskmanager上面
    根据key到指定的taskmanager上面的queryableStateServer中获取到这个key对应的状态
    在这里插入图片描述
    在这里插入图片描述

三、具体的场景使用

这里以wordCount为基础完成状态的第三方查询功能
步骤:

  1. 由于Flink中默认的状态可查询时关闭的,所以使用之前需要进行开启
  2. 在配置文件中开启状态可查询
  3. 将opt目录下的状态可查询说依赖的jar包放到lib目录下面
  4. 重启flink,并在启动日志中查看

3.1 启动状态可查询

1.修改配置文件

queryable-state.enable: true

2.复制jia依赖到lib目录下

[root@flink flink-1.10.0]# pwd
/opt/install/flink-1.10.0
[root@flink flink-1.10.0]# cp opt/flink-queryable-state-runtime_2.11-1.10.0.jar lib

3.设置状态可查询,两种方式让state在外部系统中可查询

  1. 创建QueryableStateStream,该Stream只是充当一个sink,将数据存储到queryablestate中
  2. 通过stateDescriptor.setQueryable(String queryableStateName)方法,将state可查

4.在代码中具体实现
Queryable State Stream:
通过KeyedStream对象的asQueryableState(stateName, stateDescriptor)方法,可以得到一个
QueryableStateStream对象,这个对象提供的状态值是可查询的

// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(String queryableStateName,
ReducingStateDescriptor stateDescriptor)

返回的QueryableStateStream可视为sink,无法进一步转换。在内部,将QueryableStateStream转换为一个
operator,这个operator将所有传入记录用来更新queryable state实例。更新逻辑在调用asQueryableState方法
时传递的StateDescriptor参数对象中完成。在如下程序中,Keyed Stream的所有记录在底层都是通过value
state.update(value)更新状态实例:

stream.keyBy(0).asQueryableState("query-name")

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.
_
object WordCountQueryableState {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment.socketTextStream("flink.baizhiedu.com"
,9999)
var reducingStateDescriptor=new ReducingStateDescriptor[(String,Int)]
("reducingStateDescriptor"
,new ReduceFunction[(String,Int)] {
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int)
= {
(value1.
_
1,(value1.
_
2+value2.
_
2))
}
},createTypeInformation[(String,Int)])
dataStream.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.asQueryableState("wordCountqueryableStateName"
,reducingStateDescriptor)
environment.execute("wordCountQueryableStateJob")
}
}

Managed Keyed State
可以通过StateDescriptor.setQueryable(String queryableStateName)方法实现managed keyed State状态可查询

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.
_
class MyMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
var valueState:ValueState[Int]=
_
override def open(parameters: Configuration): Unit = {
val runtimeContext = getRuntimeContext
var valueStateDescriptor=new ValueStateDescriptor[Int]
("valueStateDescriptor"
,createTypeInformation[Int])
valueStateDescriptor.setQueryable("WordCountQueryableStateManagedKeyedStateName")
valueState=runtimeContext.getState(valueStateDescriptor)
}
override def map(value: (String, Int)): (String, Int) = {
val oldValue = valueState.value()
var newValue = valueState.update(oldValue+value._2)
(value._1,valueState.value())
}
}

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.
_
object WordCountQueryableStateManagedKeyedState {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = environment.socketTextStream("flink.baizhiedu.com"
,9999)
dataStream.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new MyMapFunction)
.print()
environment.execute("WordCountQueryableStateManagedKeyedState")
}
}

Querying State
1.引入依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.10.0</version>
</dependency>
package com.baizhi.flink.queryablestate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._


//通过word count业务看状态可查询的应用
object Teest {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[String] = keyedStream.map(new MyQueryableStateMapFunction)

    result.print()


    environment.execute("queryableStateJob")
  }

}

class MyQueryableStateMapFunction extends RichMapFunction[(String, Int),String]{

  var valueState:ValueState[Int]=_


  override def open(parameters: Configuration): Unit = {

    var valueStateDescriptor:ValueStateDescriptor[Int]=new ValueStateDescriptor[Int]("VSD",createTypeInformation[Int])

    //通过状态描述者提供的方法,可以完成状态可查询的设置
    valueStateDescriptor.setQueryable("suibianxie")


    valueState=getRuntimeContext.getState(valueStateDescriptor)
  }

  override def map(value: (String, Int)): String = {

    //从状态中读取数据
    val oldCOunt: Int = valueState.value()
    valueState.update(oldCOunt+1)

    value._1+"个数是"+valueState.value()
  }
}

package com.baizhi.flink.queryablestate;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;

public class Test {

    public static void main(String[] args) throws Exception {
        //到flink服务器内部读取状态数据

        //创建queryableStateClient对象
        QueryableStateClient queryableStateClient = new QueryableStateClient("hadoop10",9069);

        JobID jobId=JobID.fromHexString("88c93ab332f0c16023a9c57b4f1718f1");//运行的job的唯一标记
        String queryableStateName="suibianxie";//可查询的状态的名字
        String key="a";//要查询的key
        TypeInformation<String> keyTypeInfo=TypeInformation.of(String.class);//key的类型信息

        //声明一个状态描述者,这个状态描述者,根据flink服务器中可查询的状态类型进行定义的
        ValueStateDescriptor<Integer> stateDescriptor=new ValueStateDescriptor<Integer>("vsd",TypeInformation.of(Integer.class));

        CompletableFuture<ValueState<Integer>> future = queryableStateClient.getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);

        ValueState<Integer> valueState = future.get();
        Integer count = valueState.value();
        System.out.println(count);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:46:21  更:2021-08-18 12:46:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:09:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码