Flink 任务生命周期
引言
本篇基于官方权威指南主要介绍Flink的任务周期, 涵盖Flink Command-Line Interface的解释以及各种Checkpoint和Savepoint的使用方法
任务生命周期管理
提交任务
$ ./bin/flink run \
--detached \
./examples/streaming/StateMachineExample.jar
– detached命令代表提交这个任务的指令已经结束, 默认在后台一直执行.
任务监控列表
Waiting for response...
------------------ Running/Restarting Jobs -------------------
01.09.2021 16:04:55 : 3d24b0cfb2b2f18bf69f4ea931c59ab9 : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
根据list命令获取job ID, 用于生成Job的savepoint.
创建一个保存点
Triggering savepoint for job 3d24b0cfb2b2f18bf69f4ea931c59ab9.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-3d24b0-991b2ef768a9
You can resume your program from this savepoint with the run command.
注意: 后面的文件保存路径如果没有设置(state.savepoints.dir), 就需要指定检查点的保存路径
根据Path信息查看MetaData
I`g-��W?c�(R����}U�������Y���M�I������m��1��9��D���$a008ce53-af17-4c08-90f3-0ef4a9c377ad8Torg.apache.flink.api.common.typeutils.base.StringSerializer$StringSerializerSnapshotwindow-contentsKEYED_STATE_TYPREDUCINGVALUE_SERIALIZERBorg.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotNorg.apache.flink.streaming.examples.socket.SocketWindowWordCount$WordWithCount�?=pcountWNorg.apache.flink.streaming.examples.socket.SocketWindowWordCount$WordWithCountcountVPorg.apache.flink.api.common.typeutils.base.LongSerializer$LongSerializerSnapshotwordVNorg.apache.flink.streaming.examples.socket.SocketWindowWordCount$WordWithCountwordZTorg.apache.flink.api.common.typeutils.base.StringSerializer$StringSerializerSnapshot�?=p�?=pNAMESPACE_SERIALIZERcorg.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot%_timer_state/processing_window-timersVALUE_SERIALIZER@org.apache.flink.�W�Torg.apache.flink.api.common.typeutils.base.StringSerializer$StringSerializerSnapshotcorg.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot _timer_state/event_window-timersVALUE_SERIALIZER@org.apache.flink.streaming.api.operators.Timer�W�Torg.apache.flink.api.common.typeutils.base.StringSerializer$StringSerializerSnapshotcorg.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializer
使用保存点
Job has been submitted with JobID 78a9562714f39c5fab23e1d1f9e895b7
这时候可以查看任务状态, 还是保存了之前提交过的数据
使用保存点启动Job, 但不关心state
$ ./bin/flink run \
--fromSavepoint <savepointPath> \
--allowNonRestoredState ...
删除一个保存点
Disposing savepoint '/tmp/flink-savepoints/savepoint-3d24b0-991b2ef768a9/_metadata'.
Waiting for response...
Savepoint '/tmp/flink-savepoints/savepoint-3d24b0-991b2ef768a9/_metadata' disposed.
查看删除结果
优雅地Stop一个Job
所谓"优雅"就是在Stop a Job的时候, 所有的sources都要发送到最后一个checkpoint上, 并且触发savepoint, 在成功提交savepoint后, 将会通过调用cancel()方法结束整个Job.
Suspending job "8768393e7a9cba7e330f6caf41cd16f1" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-876839-187041c83c39
使用–drain
如果指定了——drain标志,那么将在最后一个检查点屏障之前触发MAX_WATERMARK。这将触发所有已注册的事件时间计时器,从而清除等待特定水印的任何状态,例如窗口。作业将一直运行,直到所有源都正确关闭。这允许作业完成对所有正在进行的数据的处理,这可以生成一些记录,以便在停止时采取保存点之后进行处理。
注意: 除非想永久停止Job, 否则不要轻易清楚整个Pipeline, 不然在Job重启时可能会导致不正确的结果.
不优雅地Cancel一个Job
$ ./bin/flink cancel $JOB_ID
Cancelling job cca7bc1061d61cf15238e92312c2fc20.
Cancelled job cca7bc1061d61cf15238e92312c2fc20.
CLI 支持的命令
Action | Purpose |
---|
run | This action executes jobs. It requires at least the jar containing the job. Flink- or job-related arguments can be passed if necessary. | run-application | This action executes jobs in Application Mode. Other than that, it requires the same parameters as the run action. | info | This action can be used to print an optimized execution graph of the passed job. Again, the jar containing the job needs to be passed. | list | This action lists all running or scheduled jobs. | savepoint | This action can be used to create or disposing savepoints for a given job. It might be necessary to specify a savepoint directory besides the JobID, if the state.savepoints.dir parameter was not specified in conf/flink-conf.yaml . | cancel | This action can be used to cancel running jobs based on their JobID. | stop | This action combines the cancel and savepoint actions to stop a running job but also create a savepoint to start from again. |
使用Http协议进行消息的传递, 可以查看更多关于集群的信息状态, 后期有时间再更新.
Selecting Deployment Targets
指定不同的Target进行Job提交
- YARN
./bin/flink run --target yarn-session : Submission to an already running Flink on YARN cluster./bin/flink run --target yarn-per-job : Submission spinning up a Flink on YARN cluster in Per-Job Mode./bin/flink run-application --target yarn-application : Submission spinning up Flink on YARN cluster in Application Mode - Kubernetes
./bin/flink run --target kubernetes-session : Submission to an already running Flink on Kubernetes cluster./bin/flink run-application --target kubernetes-application : Submission spinning up a Flink on Kubernetes cluster in Application Mode - Standalone:
./bin/flink run --target local : Local submission using a MiniCluster in Session Mode./bin/flink run --target remote : Submission to an already running Flink cluster
The --target will overwrite the execution.target specified in the conf/flink-conf.yaml .
|