一.定义 Flink是一个分布式计算框架,可以处理海量数据,既可以离线批处理,也可以做实时流处理。主要是用于实时流处理。 flink实时流处理的优势可以归纳为三点: ①低延迟 ②高吞吐 ③支持精确一次 从上图,可以看出flink可以接受多种数据源数据,比如socket,file,Kafka数据源等,然后通过flink处理计算。此外,flink也支持和主流的资源调度框架的整合,比如k8s,yarn,mesos,使得对整个集群的资源调度更为合理。最后flink可以将结果存到其他的应用系统中,比如hbase,mysql,es等
二.flink的技术亮点 1.在目前所有的实时流计算框架中,flink是唯一一个同时支持低延迟、高吞吐、精确一次语义的实时流计算框架 2.flink既支持事件事件(Event Time),也支持处理时间(Process Time)之前的实时流计算框架都是基于Process Time来处理数据的。Process Time可以理解为:以数据到达计算框架被处理的时间戳为Process Time。 有些时候,可能由于网络波动、网络故障或其他的服务器故障导致实际发送的数据的顺序和实际到达的顺序不一样,导致最终结果不正确性。 Flink可以基于事件时间来进行处理 3.Flink支持状态编程 flink允许用户在实时流计算过程中,获取计算的中间状态,从而实现比较复杂的业务场景。 4.Flink支持精确一次语义 flink通过轻量级分布式快照机制,能够确保数据不丢失,并且能够精确处理 5.Flink实现了独立的内存管理机制 可以说Flink是一种内存计算框架,Flink实现了独立的内存管理机制,减少了JVMGC的依赖,从而降低了JVMGC可能带来的负面影响。
三.Flink-Source 1.CollectSource 2.FileSource 3.SocketSource 4.KafkaSource(下面讲解)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers","kafka01:9092,kafka02:9092,kafka03:9092")
properties.setProperty("zookeeper.connect","zk1:2181,zk2:2181,zk3:2181")
val consumer=new FlinkKafkaConsumer011[String]("topicsc",new SimpleStringSchema(),properties)
val source = env.addSource(consumer)
env.execute()
source.print()
四.Flink API Flink的模块可以分为三个模块: 1.DataSource模块:用于指定flink的数据源,使得flink可以获取 2.Transformation模块(Flink API):用于定义flink对数据的处理逻辑。 3.DataSink模块。用于指定flink的输出目的地,使得flink可以将计算结果输出到其他的应用系统中
Flink API可以分为两类: 1.DataStream API,用于实时流处理的API 2.Data Set,用于离线处理的API
Flink既可以做离线处理,也可以做实时处理,但flink最擅长的就是实时流处理.。 实时系列: ①map, ②flatmap, ③filter, ④keyBy(将Stream指定key并根据key的散列值进行分区), ⑤reduce, ⑥Aggregations(是DataStream结构提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、max,这样就不需要用户自己定义Reduce函数。),
val result = source.flatMap(line=>line.split(" "))
.map(world=>(word,1))
.keyBy(0)
.timeWindow(Time.of(5,TimeUnit.SECONDS))
.reduce((a,b))
⑦union, ⑧split&select(func) ⑨connect
五.Flink的并发度(并行度)概念及设置使用 首先先明确Flink的Slot的概念 1.Slot槽位,初学时类比于Yarn中的Container容器概念。是对资源(内存+cpu核数)的一种限定和划分。 2.Slot是针对TaskManager而言的,默认情况下,每个TaskManager的Slot的数量=其CPU核数总和 3.Slot对应TaskManager的资源是均分策略 4.通过设置Flink的并行度,从而提高Flink的开发处理能力。可以认为Flink的并行度就是分区数。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val source = env.socketTextStream("hadoop01",8888)
val result = source.flatMap(line=>line.split(" ")).map(word=>(word,1))
补充:如果在idea启动,默认并行度等于电脑核数 小结:可以将flink的并行度看做分区,并行度越高,分区数越多,则flink的并行效率越高
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("hadoop01",8888)
val result = source.flatMap(line=>line.split(" ")).setParallelism(2).map(word=>(word,1)).setParallelism(3)
result.print().setParallelism(1)
Flink的并行度设置分为4个层面,算子层面(代码算子后面中写),执行环境层面(env.setParallelism(2)),客户端层面(提交任务页面),系统层面(配置文件中配置)–优先级从前往后,算子优先级最高
六.Flink的窗口计算 Flink的窗口计算,指的是window操作,需要注意的是:窗口计算必须是基于KeyedStream来操作的,即必须在keyBy之后进行窗口计算,在一个窗口内针对同一个key的数据进行处理。 Flink的窗口计算分为: 1.滚动窗口:窗口之间数据没有重叠部分,比如设定窗口的大小=5s 2.滑动窗口:需要指定一个滑动区间以及窗口大小,每个一段时间计算一个窗口数据。即窗口与窗口之间存在重叠数据。比如窗口大小=10s,滑动区间=5s 3.计数窗口,当时间key的数量达到指定计数操作的时候时,触发后续计算:当相同key的数量达到指定计数操作时,触发后续计算
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordcount.print()
env.execute()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1)
wordcount.print()
env.execute()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.countWindow(4)
.sum(1)
wordcount.print()
env.execute()
如果想进入自定义处理逻辑,需要使用ProcessWindowFunction。 模拟场景:对每个窗口内的数据,对相同key的数据做升序
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("hadoop01",8888)
val result= source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toInt))
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.process(new MyWindowProcess )
result.print()
env.execute()
class MyWindowProcess extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
override def process(key:String, context:Context, elements: Iterable[(String,Int)], out:Collecter[String]):
println("key:" + key + "排序后的集合:" + elements.toList.sortBy(x=>x._2>))
out.collect("")
}
输出结果:
七.Flink Sink 支持(代码百度中有) 1.FileSink 2.KafkaSink 3.HBaseSink 4.JDBC Sink 5.ES Sink
八.Flink的EventTime时间时间与WaterMark(水位线) Flink支持三种时间语义,分别是: 1.Event Time,事件时间,即数据产生的时间 2.Ingestion Time,接收时间,即数据被FlinkSource组件接收的时间 3.Processing Time,处理时间,即数据被Flink API处理的时间
Flink在流处理应用中,EventTime和Processing Time较多,ingestion Time属于折中方案,用的较少,不做讨论。
综上我们会发现,使用Process Time潜在的问题是:处理的顺序和实际发生的顺序可能是不一致的,导致最终结果是不正确。又或者说,在上述场景中,我们希望等一等迟到的数据。
如果要想处理迟到的数据的乱序问题,就需要使用EventTime和Watermark机制。
EventTime和Watermark机制,需要: 1.设定时间语义:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 2.设置一个最大的延迟时间 3.需要结合使用窗口函数 4.产生的数据中需要带有时间戳
Watermark=当前最大的事件时间-设定的最大延迟时间 Watermark称为水位线,如果Watermark越过了窗口对应的结束时间窗口才会关闭和进行计算 综上,我们可以发现使用EventTime和Watermark可以解决迟到数据以及乱序的问题。但不一定能够百分百完美解决。就比如4在6时间戳后面,则4进入不到其对应的窗口,被丢弃处理。 所以使用eventTime和Watermark,使得对迟到数据以及乱序问题的处理提供了可能。 当然也需要衡量延迟与准确性,延迟越大,准确性越高。具体设定为多大的延迟,没有一个固定的标准。
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.socketTextStream("ip01",8888)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String]){
var maxEventTime = 0L
var watermark = 0L
val maxDelay = 1000L
override def getcurrentWatermark:Watermark ={
watermark = maxEventTime-maxDelay
new Watermark(watermark)
}
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val eventTime = element.split(",")(1).toLong
maxEventTime = Math.max(eventTime.maxEventTime)
println("当前事件戳:"+eventTime+"上一次的watermark:"+watermark)
eventTime
}
})
val result = source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toLong,arr(2).toInt))
.keyBy(0).timeWindow(Time.second(3)).sum(2)
result.print()
env.execute()
}
另一编码方式,封装了事件时间和watermark
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.socketTextStream("ip01",8888)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(1)){
override def extractTimestamp(element: String): Long = element.split(",")(1).toLong
})
val result = source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toLong,arr(2).toInt))
.keyBy(0).timeWindow(Time.second(3)).sum(2)
result.print()
env.execute()
}
九.底层API(ProcessFuntion) FlinkAPI可以分为三层。 1)低级API(底层)API:提供了对时间和状态的细粒度控制,如果想要获取底层的数据时间戳(处理时间或事件时间),获取watermark,实现侧输出流以及状态编程,则需要用到低级API,可以说底层API能够做很多复杂的工作。 2)核心API:对底层API进一步封装,主要对流和批数据处理。简单易用。 3)高级API:再进一步封装,通过sql的方式对flink进行。不作为重点,了解即可。 ProcessWindowFunction->在窗口函数操作后使用。 比如:source…key.timewindow.process(ProcessWindowFuntion) ProcessFunction 比如:source…map/flatMap/filter.process(ProcessFunction) KeyedProcessFunction 比如:source…keyBy.process(KeyedProcessFunction)
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
.map(line)=>line.split(","))
.map(arr=Student(arr(0),arr(1),arr(2).toInt))
.process(new MySplitProcess)
val outputTag01 = new OutputTag[String]("stream_1")
val outputTag02 = new OutputTag[String]("stream_2")
val sideStream01 = source.getSideOutput(outputTag01)
val sideStream02 = source.getSideOutput(outputTag02)
source.print("主流--")
sideStream01 .print("侧输出流01--")
sideStream02 .print("测输出流02--")
env.execute()
}
class MySplitProcess extends ProcessFunntion[Student,Student]{
override def processElement(value: Student,ctx: ProcessFunction[Student, Student]#Context,out: Collector[Student]):Unit={
if(value.age>18){
out.collect(value)
}else if(value.age=18){
ctx.output(new OutputTag[String]("stream_1"),value.name+":"+value.age)
}else{
ctx.output(new OutputTag[String]("stream_2"),value.name)
}
}
}
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
.map(line)=>line.split(","))
.map(arr=Student(arr(0),arr(1),arr(2).toInt))
.keyBy("name")
.process(new MyKeyedProcess01)
source.print()
env.execute()
}
class MyKeyedProcess01 extends keyedProcessFunntion[Tuple,Student,String]{
override def processElement(value: Student,ctx: keyedProcessFunction[Tuple, Student, String]#Context,out: Collector[Student]):Unit={
val key:String = ctx.getCurrentKey.getField(0)
print("key为:" + key)
out.collect("hello")
}
}
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
.map(word=>(word,1))
.keyBy(0)
.process(new MyKeyedProcess02)
source.print()
env.execute()
}
class MyKeyedProcess02 extends keyedProcessFunntion[Tuple,(String,Int),String]{
lazy val countState = getRuntimeContext.getState(new ValueStateDescriptor[String]("count"),classOf[String]))
override def processElement(value: (String,Int), ctx: keyedProcessFunction[Tuple, (String,Int), String]#Context,out: Collector[String]):Unit = {
if(countStata.value() = null){
countState.updata("")
}
val key:String=ctx.getCurrentKey.getField(0)
val result = countState.value()+"-"+value_2
countState.updata(result)
out.collect(key+result)
}
}
十.Flink的状态编程 上述第二个例子以及涉及状态编程了。 Flink支持有状态的流计算和无状态的流计算 Flink的状态总体来说可以分为两种: 1.算子状态(Operator State) 算子的状态可以理解为直接在map,flatmap,filter这些转换类型算子上使用的状态,算子状态的作用范围是算子任务。 2.键控状态(keyed State)最常用的 可以在keyBy后使用的状态、键控状态的作用范围是每个key维护一个状态,不同key的状态互相之间不共享的。
使用Flink的状态编程: source…keyBy.process(ProcessFunction)–如果想获取时间戳、定时器、watermark以及状态编程可以使用ProcessFunction source…keyBy.map(RichMapFunction–如果不需要获取时间戳、定时器等,仅是做时间状态编程,则可以使用对应算子的富函数来实现 source…keyBy.flatMap(RichFlatMapFuntion) source…keyBy.filter(RichFliterFunction)
object MapDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).map(new MyMapFuncWithState )
result.print()
env.execute()
}
}
class MyMapFuncWithState extends RichMapFunction[(String,Int),Int]{
lazy val numState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("num",classOf[Int]))
override def map(value: (String, Int)): Int = {
if(numState.value()==0){
numState.updata(value._2)
value._2
}else{
val result = value._2-numState.value()
numState.updata(value._2)
result
}
}
}
object FlatMapDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).flatMap(new MyFlatMapFuncWithState )
result.print()
env.execute()
}
}
class MyFlatMapFuncWithState extends RichMapFunction[(String,Int),String]{
lazy val numState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("num",classOf[Int]))
override def flatMap(value: (String, Int), out: Collector[String]): Unit = {
if(numState.value()==0){
out.collect("初始数据:"+0)
out.collect("本次数据:"+value._2)
out.collect("初始数据:"+value._2)
numState.updata(value._2)
}else{
out.collect("上一次数据:"+numState.value())
out.collect("本次数据:"+value._2)
numState.updata(value._2)
val result = value._2-numState.value()
out.collect("计算之差:"+result)
}
}
}
object FilterDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).filter(new MyFliterMapFuncWithState)
result.print()
env.execute()
}
}
class MyFliterMapFuncWithState extends RichFilterFunction[(String,Int)]{
lazy val numState = getRuntimeContext.getListState(new ListStateDescriptor[Int]("num",classOf[Int]))
override def filter(value: (String, Int)): Boolean = {
if(!nums.get().iterator().hasNext){
nums.add(value._2)
true
}else{
nums.add(value._2)
var sum = 0
var count = 0
val numsIt= nums.get().iterator()
while(numsIt.hasNext){
sum = sum.numsIt.next()
count = count+1
}
val avg = sumn/count
println("当前历史数据的均值为:"+avg)
val flag = if((value._2-avg).abs)true else false
flag
}
}
}
十一.Flink的数据一致性保障(通过轻量级分布式快照实现) 在实时流处理系统中,数据一致性保障可以分为三种: 1.at most once 至多一次,实际上没有可靠性的委婉说法,即数据存在丢失的可能 2.at least once 至少一次,可以确保数据丢失,但可能会重复处理从而导致结果的不正确 3.exectly once精确一次,可以确保数据不丢失,并且精确处理,保证最终结果的正确性
曾经,at least once很流行,典型的代表框架Storm,Samza,底层可以保障at least once。即能够确保数据不丢失,但不能保障数据不重复处理,如果用户要实现精确一次予以,需要自己实现(比如通过幂等性操作) 后来,StormTrident和sparkStreaming框架出现了,地城支持了精确一次语义。但也付出了一定代价,都是以批的处理机制来处理流i数据,即在批的层面上实现了精确一次的语义,即一批数据要么全都成功,要么全部失败。这么做提升了吞吐量,但关键是牺牲了低延迟的特性。 目前Flink可以说是业界内唯一同时支持:高吞吐、低延迟、精确一次语义的实时流框架
轻量级分布式快照:举一个游戏 游戏过程与技术的参照 ①珠子:实时的数据 ②数珠子过程:实时流数据处理的过程 ③必须数准:精确一次语义 ④病人:实时流计算框架 ⑤脑补疾病:实时流计算框架工作时可能突发的各种异常状态 ⑥助手以及系皮筋一系列措施:flink的轻量级分布式快照机制
Flink的轻量级分布式快照机制: ①需要设定checkpoint(看做是哪个皮筋,在技术中成为快照)周期,比如3s ②需要设定checkpoint.dir,快照的存储位置。flink支持将快照存到内存中(测试环境)或文件系统中(生产环境)
chk一般为检查点屏障,可以看做是一条特殊的记录,会触发flink checkpoint相关的操作 flink的快照机制是属于异步处理机制,性能较高,此外当新快照生产后,老快照会被覆盖。所以快照资源不会占用很多。
object Driver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(3000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new RocksDBStateBackend("hdfs地址"))
val source = env.socketTextStream("ip01",8888)
val wordcount = source.flatMap(line=>line.split(" ")).map(word=>(word,1)).keyBy(0).sum(1)
wordcount.print()
env.execute()
}
}
注意:如果要想让flink实现真正意义上的精确一次,还需要确保端到端的数据一致性。所以在生产环境下,flink的数据源基本是从kafka过来。因为kafka是否被消费数据都会存在,chk会记录它的位置,flink和kafka是完美的配合
|