flink(二):DataStreamAPI和状态管理
Stream(DataStreamAPI)
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html
DataSource:数据源
数据源是程序读取数据的来源。用户可以通过StreamExecutionEnvironment.addSource(sourceFunction) 将数据源添加到程序中。Flink提供了很多的sourceFunction,用户也可以自定义sourceFunction。可以通过实现SourceFunction 接口实现非并行化,也可以通过实现ParallelSourceFunction 或者继承RichParallelSourceFunction 实现并行化。
一系列数据源的应用
File-based(了解)
- readTextFile(path) - Reads text files, i.e. files that respect the
TextInputFormat specification, line-by-line and returns them as Strings.
- 添加Hadoop依赖
- 读取hdfs中的文件
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
//1.创建执行环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//2.获取数据源
val text = environment.readTextFile("hdfs://flink.baizhiedu.com:8020/flink/flink-words")
//3.对获取到的数据进行转换
val result = text.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
//4.打印结果
result.print()
//5.执行job
environment.execute("myFlinkJob")
Note:
hdfs://flink.baizhiedu.com:8020/flink/flink-words
Socket-based
-
socketTextStream - Reads from a socket. Elements can be separated by a delimiter. val text = environment.socketTextStream("flink.baizhiedu.com",9999)
Read from Apache Kafka☆
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q1iAQYps-1629010052770)(day2笔记.assets/image-20210318173223077.png)]
previously on Zookeeper&Kafka
https://zookeeper.apache.org/doc/current/zookeeperStarted.html
1.启动zookeeper
[root@flink apache-zookeeper-3.5.7-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/install/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
2.连接zookeeper
[root@flink apache-zookeeper-3.5.7-bin]# bin/zkCli.sh -server 127.0.0.1:2181
http://kafka.apache.org/quickstart
1.启动kafka
[root@flink kafka_2.11-2.2.0]# bin/kafka-server-start.sh config/server.properties
2.创建主题
[root@flink kafka_2.11-2.2.0]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
3.查看主题
[root@flink kafka_2.11-2.2.0]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
4.发送消息
[root@flink kafka_2.11-2.2.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topica
5.消费消息
[root@flink kafka_2.11-2.2.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic01 --from-beginning
Flink集成Kafka
? https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
-
引入maven依赖 <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
-
各种DataSource的应用
-
SimpleStringSchema SimpleStringSchema只会反序列化value object QuickStart {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "flink.baizhiedu.com:9092")
var text = environment
.addSource(new FlinkKafkaConsumer[String]("topic01", new SimpleStringSchema(), properties));
val result = text.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
result.print()
environment.execute("myFlinkJob")
}
}
-
**KafkaDeserializationSchema ** 通过实现这个接口,可以反序列化key、value、partition、offset等 import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String,Int,Long)]{
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false;
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if(consumerRecord.key()!=null){
(new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}else{
(null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}
}
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
createTypeInformation[(String, String, Int, Long)];
}
}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object QuickStart {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "flink.baizhiedu.com:9092")
var text = environment
.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01", new MyKafkaDeserializationSchema(), properties));
val result = text.flatMap(line =>line._2.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
result.print()
environment.execute("myFlinkJob")
}
}
-
JSONKeyValueDeserializationSchema 这个是flink-kafka提供的类,可以直接使用,在使用的时候要求kafka中topic的key、value都必须是json。也可以在使用的过程中,指定是否读取元数据(topic、partition、offset等) import java.util.Properties
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
import org.apache.flink.api.scala._
object JSONKeyValueDeserializationSchema {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
var properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "flink.baizhiedu.com:9092")
val text = environment
.addSource(new FlinkKafkaConsumer[ObjectNode]("topic05",new JSONKeyValueDeserializationSchema(false),properties));
text.map(t=>(t.get("value").get("id").asInt(),t.get("value").get("name").asText())).print()
environment.execute("myFlinkJob")
}
}
[root@flink kafka_2.11-2.2.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic05
>{"id":101,"name":"xiaohei"}
Note
- 注意导包
import org.apache.flink.api.scala._ - Kafka中的数据需要是JSON格式
算子
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/
算子 | 描述 |
---|
map | 映射 | flatmap | 映射(压平) | filter | 过滤操作 | keyby | 分组操作;执行完成之后得到的是keyedStream;keyby算子可以把dataStream转换成keyedStream |
val dataStream2: DataStream[Array[String]] = dataStream.map(_.split("\\s+"))
dataStream2.print()
val value: DataStream[String] = dataStream2.map(e => e(0) + "***" + e(1))
value.print()
-
keyedStream的理解
在flink中,数据是有状态的;数据的状态很多时候是和keyedStream结合在一起使用的;keyedState(同一个key对应的是同一块状态区域)
datasink:数据输出
支持多种输出方式:打印、文件(HDFS)、redis、kafka…
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sinks
生产环境,通常使用flink-connector-filesystem 把结果写入到外部文件系统
- 添加
flink-connector-filesystem 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
- 代码实现
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.scala._
object FileDataSinkFlinkConnectorFileSystem {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val text = environment.socketTextStream("flink.baizhiedu.com",9999)
var streamFileSink = StreamingFileSink.forRowFormat(new Path("hdfs://flink.baizhiedu.com:8020/flink-result"),
new SimpleStringEncoder[(String,Int)]())
.withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd"))
.build();
val result = text.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
result.addSink(streamFileSink)
environment.execute("myFlinkJob")
}
}
状态管理
参考https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html
Flink是基于状态的流计算引擎。
在Flink中有两种基本类型的state,分别是Keyed State 和Operator State 。Keyed State只能应用在KeyedStream上的操作。每一个keyed operator都会绑定一个或多个状态值。Operator State又被称为non-keyed state,每一个算子都会有对应的operator state。
Keyed State以及Operator State都会以两种方式存储:managed和raw。
managed state指的是由Flink控制state的数据结构,比如使用内部hash表、RocksDB等。正是基于此,Flink可以更好地在managed state基础上进行内存优化和故障恢复。
raw state指的是Flink只知道state是一些字节数组,其余一无所知。需要用户自己完成state的序列化以及反序列化。因此,Flink不能基于raw state进行内存优化以及故障恢复。所以在企业实战中,很少使用raw state
Managed Keyed State(必须掌握)☆
managed keyed state 接口提供了对不同数据类型的state的访问,这些state都是和key绑定的。这也就意味着managed keyed state只能应用在KeyedStream上。Flink内置的有以下几种managed keyed state
类型 | 使用场景 | 方法 |
---|
ValueState | 该状态用于存储单一状态值 | update(T) T value() clear() | ListState | 该状态用于存储集合状态值 | add(T) addAll(List) Iterable get() update(List) clear() | MapState<UK, UV> | 该状态用于存储Map集合状态值 | put(UK, UV) putAll(Map<UK, UV>) get(UK) entries() keys() values() clear() | ReducingState | 该状态用于存储单一状态值。该状态会通过调用用户提供的ReduceFunction,将添加的元素和历史状态自动做运算 | add(T) T get() clear() | AggregatingState<IN, OUT> | 该状态用于存储单一状态值。该状态会通过调用用户提供的AggregateFunction,将添加的元素和历史状态自动做运算。该状态和ReducingState不同点在于,输入数据类型和输出数据类型可以不同 | add(IN) OUT get() clear() | FoldingState<T, ACC> | 该状态用于存储单一状态值。该状态会通过调用用户提供的FoldFunction,将添加的元素和历史状态自动做运算。该状态和ReducingState不同点在于,输入数据类型和中间结果数据类型可以不同 | add(T) T get() clear() |
ValueState<T> getState(ValueStateDescriptor<T>) ReducingState<T> getReducingState(ReducingStateDescriptor<T>) ListState<T> getListState(ListStateDescriptor<T>) AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>) FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>) MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
代码实现整体思路☆☆☆
- 写一个类,继承RichMapFunction类
- 重写RichMapFunction里面的open方法
在open方法中,通过RuntimeContext对象的getXxxState(XxxStateDescriptor)方法获取到XxxState对象
- 实现RichMapFunction里面的map方法
在map方法中,通过XxxState对象根据业务需要实现具体功能
- 在代码中的KeyedStream上使用自定义的MapFunction
ValueState
? 实现wordcount
package com.baizhi.flink.state
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object ValueStateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
val result: DataStream[String] = keyedStream.map(new MyMapFunction)
result.print()
environment.execute("ValueStateJob")
}
}
class MyMapFunction extends RichMapFunction[(String, Int),String]{
var valueState:ValueState[Int]=_
override def open(parameters: Configuration): Unit = {
val runtimeContext: RuntimeContext = getRuntimeContext
var valueStateDescriptor:ValueStateDescriptor[Int]=new ValueStateDescriptor[Int]("valueState",createTypeInformation[Int])
valueState=runtimeContext.getState(valueStateDescriptor)
}
override def map(value: (String, Int)): String = {
val oldCount: Int = valueState.value()
val newCount: Int = oldCount + value._2
valueState.update(newCount)
value._1+"==的数量是==>"+newCount
}
}
总结
-
必须有keyedStream才可以使用keyed state(ValueState/ListState/MapState/ReducingState/AggregatintState) keyedState就是和key绑定在一起的状态(每一个key对应一个状态;不同的key对应的是不同的状态) -
要使用state,就需要创建 通过RuntimeContext对象提供的方法完成state的创建
- 通过flink提供的方法,获取到runtimContext对象
- 创建state对象的方法需要stateDescriptor(描述者,用来描述创建出来的state可以存储什么类型的数据)—》通过new关键字创建Descriptor
-
在map方法中使用state完成数据的存取处理
ListState
? 实现用户浏览商品类别统计
package com.baizhi.flink.state
import java.lang
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import scala.collection.JavaConverters._
object ListStateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, String, String), Tuple] = dataStream
.map(_.split("\\s+"))
.map(array => (array(0), array(1), array(2)))
.keyBy(0)
val result: DataStream[(String, String)] = keyedStream.map(new MyListStateMapFunction)
result.print()
environment.execute("ListStateJob")
}
}
class MyListStateMapFunction extends RichMapFunction[(String, String, String),(String,String)]{
var listState:ListState[String]=_
override def open(parameters: Configuration): Unit = {
listState=getRuntimeContext.getListState(new ListStateDescriptor[String]("lsd",createTypeInformation[String]))
}
override def map(value: (String, String, String)): (String, String) = {
val oldIterable: lang.Iterable[String] = listState.get()
val scalaList: List[String] = oldIterable.asScala.toList
val list: List[String] = scalaList :+ value._3
val distinctList: List[String] = list.distinct
listState.update(distinctList.asJava)
(value._1+":"+value._2,distinctList.mkString(" | "))
}
}
MapState
? 统计用户浏览商品类别以及该类别的次数
var count = 1;
if(mapState.contains(value._2)){
count=mapState.get(value._2)+1
}
mapState.put(value._2,count)
val nowData: List[String] = mapState.entries().asScala.map(entry=>entry.getKey+"->"+entry.getValue).toList
(value._1,nowData.mkString(" | "))
package day2
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import scala.collection.JavaConverters._
object MapStateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, String), Tuple] = dataStream.map(_.split("\\s+"))
.map(words => (words(0) + ":" + words(1), words(2)))
.keyBy(0)
val result: DataStream[String] = keyedStream.map(new MyMapMapFunction)
result.print()
environment.execute("MapStateJob")
}
}
class MyMapMapFunction extends RichMapFunction[(String,String),String]{
var mapState:MapState[String,Int]=_
override def open(parameters: Configuration): Unit = {
mapState=getRuntimeContext.getMapState(new MapStateDescriptor[String,Int]("MapStateDescriptor",createTypeInformation[String],createTypeInformation[Int]))
}
override def map(value: (String, String)): String = {
var category:String = value._2
var count:Int=0
if(mapState.contains(category)){
count=mapState.get(category)
}
mapState.put(category,count+1)
val list: List[String] = mapState.entries().asScala.map(entry => entry.getKey + ":" + entry.getValue).toList
val str: String = list.mkString(" | ")
value._1+"--->"+str
}
}
ReducingState
? (存储单一值,可以自动运算,要求输入类型和输出类型是一致的)
? 实现wordCount自动统计
package day2
import org.apache.flink.api.common.functions.{ReduceFunction, RichMapFunction, RuntimeContext}
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object ReducingStateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
val result: DataStream[String] = keyedStream.map(new MyReducingMapFunction)
result.print()
environment.execute("ReducingStateJob")
}
}
class MyReducingMapFunction extends RichMapFunction[(String,Int),String]{
var reducingState:ReducingState[Int]=_
override def open(parameters: Configuration): Unit = {
val context: RuntimeContext = getRuntimeContext
val name:String="ReducingStateDescriptor"
val typeInfo:TypeInformation[Int]=createTypeInformation[Int]
val reduceFunction: ReduceFunction[Int] = new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
value1+value2
}
}
var reducingStateDescriptor:ReducingStateDescriptor[Int]=new ReducingStateDescriptor[Int](name,reduceFunction,typeInfo)
reducingState=context.getReducingState(reducingStateDescriptor)
}
override def map(value: (String, Int)): String = {
reducingState.add(value._2)
value._1+":"+reducingState.get()
}
}
AggeragetingState
(存储单一值,可以自动运算,输入类型和输出类型可以不一致的;还可以在运算过程中有中间类型)
? 实现用户订单平均金额
package day2
import org.apache.flink.api.common.functions.{AggregateFunction, RichMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object AggregatingStateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Double), Tuple] = dataStream.map(_.split("\\s+"))
.map(words => (words(0) + ":" + words(1), words(2).toDouble))
.keyBy(0)
val result: DataStream[String] = keyedStream.map(new MyAggregateMapFunction)
result.print()
environment.execute("AggregatingStateJob")
}
}
class MyAggregateMapFunction extends RichMapFunction[(String,Double),String]{
var aggregatingState:AggregatingState[Double,Double]=_
override def open(parameters: Configuration): Unit = {
var name:String="aggregatingStateDescriptor"
var aggFunction:AggregateFunction[Double,(Double,Int),Double]=new AggregateFunction[Double,(Double,Int),Double] {
override def createAccumulator(): (Double, Int) = (0,0)
override def add(value: Double, accumulator: (Double, Int)): (Double, Int) = (accumulator._1+value,accumulator._2+1)
override def getResult(accumulator: (Double, Int)): Double = accumulator._1/accumulator._2
override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = (a._1+b._1,a._2+b._2)
}
var accType:TypeInformation[(Double,Int)]=createTypeInformation[(Double,Int)]
var aggregatingStateDescriptor:AggregatingStateDescriptor[Double,(Double,Int),Double]= new AggregatingStateDescriptor[Double,(Double,Int),Double](name,aggFunction,accType)
aggregatingState=getRuntimeContext.getAggregatingState[Double,(Double,Int),Double](aggregatingStateDescriptor)
}
override def map(value: (String, Double)): String = {
aggregatingState.add(value._2)
val avg: Double = aggregatingState.get()
value._1+"的订单平均金额:"+avg
}
}
|