本文简介
列出了flink需要学习的基本原理框架梗概,适合初学者的学习思路,具体flink要学些什么,可以解决什么问题, 细致的参数设置、脚本提交、API调用还是要参考官网。
截止到目前 flink已经更新到 1.13 1.12版本已经实现了sql的批流统一,和hive有了较好的交互
1.10之前的版本与之后相差较大,尤其是内存模型、flinktable和flinksql部分、提交脚本和参数设置也略有不同
【官网传送】
https://ci.apache.org/
正文!!!
【概述】
【 解决】 更高要求的实时性、大数据准确计算
【特点】
- 流处理、低延迟、高吞吐 事件驱动,
- ms延迟
- 多种窗口类型
- 时间语义+watermark机制 可解决数据乱序问题
- 状态管理+checkpoint 容灾机制 保证 execatly-once
- 7*24
【分层API (3)】
- process
- datastream/dataset(一般用这个)
- table/sql
【和其他框架对比】
- sparkstreaming 微批处理、准实时、 s级别、吞吐高、无法处理乱序数据、没有时间语义、窗口单一
- strom 快速 ms级别、准确性不高、吞吐不高、无法处理乱序数据
【基本编程模型】
source–transform–sink
【wordcount】
- 批处理:(以从文件中获取数据为例)
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "D:\\hello.txt"
val inputDS: DataSet[String] = env.readTextFile(inputPath)
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
wordCountDS.print()
- 流处理:(以从socket文本流中获取数据为例)
val params: ParameterTool = ParameterTool.fromArgs(args)
val host: String = params.get("host")
val port: Int = params.getInt("port")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textDstream: DataStream[String] = env.socketTextStream(host, port)
import org.apache.flink.api.scala._
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
dataStream.print().setParallelism(1)
env.execute("Socket stream word count")
【架构和集群角色】
job manager 整个作业管理 负责任务调度、slot分配、checkpoint记录位置等 task manager 整个任务管理 负责运行具体任务,slot在此运行 resource manager 整个资源管理 负责具体的资源分配 dispatcher client restful接口 相当于flink自带的webui
【任务提交】
-
app 提交应用程序 到dispatcher(jar包等) 在client 生成 stream Graph,根据代码算子流程 再根据 stream Graph 生成 job Graph ,合并operator chain 合并条件:one to one操作+并行度一致 为什么要合并?属于一种优化,减少io和网络传输 默认会合并,在相同slot上执行,可以设置不合并 -
dispatcher 把请求转发给 job manager -
job manager 做任务分析管理 生成调度器 把job Graph转化成exection Gragh,根据并行度展开 确定需要的slot, 向 resource manager 请求slot资源,请求启动task manager -
task manager启动之后,和resource manager、job manager做心跳链接、交互等等, JM RPC进行远程调用启动TM 根据物理执行图执行具体任务 准备拿到slot进行具体的任务执行
【部署模式】
【yarn提交脚本】 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/ 【内存参数配置】 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/memory/mem_setup/
- standalone
- yarn
session模式 per-job模式(开发中用这个模式)一个job一个集群 - k8s容器化部署
提交per-job提交脚本
flink run -m yarn-cluster -yn 2 -yjm 1024 -yqu root.info -ys 6 -ytm 5120 -p 12 -c $class -ynm indexCount
$jar $kuduMaster $kuduTable $groupId $botstrap $topic
flink run -t yarn-per-job
【任务调度】
-
四个图 stream gragh(程序流图) client 生成的 代码中算子的流程 job gragh(工作流图) client 由stream gragh生成的 operator chain 合并优化 exection gragh(执行流图) job manager 由 job gragh 生成的 并行度展开 物理执行图 taskmanager具体执行的 -
slot 是什么 在taskmanager中分配的内存,表示并行处理的能力 每个算子真正运行的内存 设置 一般设置为该节点的cpu核数,最大利用 共享slot 不同任务可以共享,提高slot利用率 并行设置 设置优先级:代码中单个算子>代码中env>webui中设置>yaml配置文件中
【相关算子】
-
source addsource(具体看源) -
transform map、flatmap、max、sum keyBy 【原理】hash重分区 DataStream → KeyedStream
shuffle
随即分区
rebalance
平均分配到每个区,做优化用
rescale
也是重分区,
比rebalance高效
-
sink addsink
【window操作】
【带具体例子的博文】https://blog.csdn.net/qq_42596142/article/details/103941520?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-1.control
-
是什么 无限流切成有限流 本质是 bucket,把属于窗的数据丢到相应的bucket中 左闭右开 [ ) 开窗之前必须 keyBy -
编程模型:keyBy+窗口分配器(windowAssigner)+窗口相应操作(windowFunction) -
窗口分配器分类 时间窗口(timeWindow) 滚动(tumblingWindow):窗口固定,时间对齐,无重叠 滑动(slidingWindow):窗口固定,有重叠,需要指定滑动步长 会话(sessionWindow): 根据元素分组 没有固定的开始和结束时间 元素来了就开启,一段时间没有就结束 可以设置gap控制 时间 全局(globalWindow):自定义触发器和计算方法 计数窗口 (countWindow) 滚动 滑动 -
窗口函数 分类 增量窗口:来一个计算一个,保持状态再叠加,更高效 全窗口:缓存一整个窗口的数据,窗口结束再做计算 增量窗口(.recduce) reduceFunction 实现简单的累加需求,数据量小,可以写lambda表达式直接实现 具体使用 .keyBy.window.reduce(new reduceFunction(){}) aggregateFunction 实现复杂需求,输入、中间、输出类型不一致的需求 具体使用.keyBy.window.reduce(new aggregateFunction(){}) 全窗口(.process) processFunction 具体使用 .keyBy.window.process(new processWindowFunction (){})
【时间语义】
-
是什么 衡量不同时间,为解决流数据乱序问题 1.12前默认是处理时间,1.12之后是事件时间 -
分类 事件时间(eventTime) 数据真正产生的时间 可以从数据的某个时间提取 处理时间(processingTime) 机器时间、系统时间 进入flink的时间(ingestionTime) -
具体设置 结合watermark一起使用 env.setxxxx()
【watermark】
-
是什么 用来处理乱序数据,延迟触发机制 插在数据流中的特殊结构–时间戳 最晚关窗时间,到了某个watermark触发窗口关闭,再来的就不计算了 -
分类 周期性 打点式 -
机制 设置 watermark=maxEventTs-lates 事件时间 最大可延迟的乱序时间:自己设定的,可以根据经验值,也可以根据自己的计算 -
传递 上游:把上游的watermark放到watermark partition中 下游:把最小的watermark广播到下游
【事件状态】
-
是什么 记录事件的操作,知道前面的算子发生了什么 存状态有不同的数据结构 -
分类 算子状态 键控状态(keyedStream) 只能访问到当前的key
【状态后端】
-
是什么 本地状态管理(TM) checkpoint远程存储 -
分类 memoryStateBacked-- jvm内存中 FsStateBacked–文件系统 RocksDBStateBacked–rocksdb kv数据库中 -
使用 可在代码env中配置,也可在yaml中配置
【checkpoint】
-
是什么 标记,插在数据流中的特殊数据结构–barries 为了保持状态一致性语义(excatly-once) 保存任务的状态 分布式快照算法 -
机制 JM发起的,并保存checkpoint的拓扑 具体存放位置由状态后端决定 一般存到 Fs 所有任务恰好处理完相同的数据输入,才触发checkpoint 异步的,不停数据处理 上游对齐 遇到数据处理数据 遇到barries,先把数据缓存起来,处理完checkpoint再进行数据处理 不同分区任务都处理完barries,返回给JM,checkpoint才算完成 下游广播
【状态一致性】
-
是什么 三种处理语义 at-most-once至多一次,可丢 at-least-once至少一次,可重复 excatly-once精确一次,不丢不重复 -
怎么实现 flink内部 checkpoint机制 source 可以保存和恢复、回退:存offset sink 保证幂等性: 可重复做,但结果不重复 如hashmap的key、redis 具有事务性:(预写日志、两阶段提交) 和flink的checkpoint绑定事物,checkpoint完成再提交,不完成就不提交 -
kafka实现状态一致 source:管理offset sink: 两阶段提交(内部自动实现了接口) 注意设置kafkasink参数 设置 sematic.excatly-once 消费者隔离级别:read-commited kafka的timeout设置的大一些
【内存管理】
1.10之后做了重大改变 管理的是JM和TM的内存(区别spark中,管理的是executor的内存) 堆上+堆外(为了防止fullGC) 区分flink内存和进程内存 参数设置 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/memory/mem_setup/
|