IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 记一次flink1.13 NoRestartBackoffTimeStrategy异常处理过程 -> 正文阅读

[大数据]记一次flink1.13 NoRestartBackoffTimeStrategy异常处理过程

记一次flink1.13 NoRestartBackoffTimeStrategy异常处理过程


异常如下:

2021-08-10 15:28:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: open() failed.connection disabled
	at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.openInputFormat(JdbcRowDataInputFormat.java:122)
	at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:227)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.sql.SQLException: connection disabled
	at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1170)
	at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1155)
	at com.alibaba.druid.pool.DruidPooledConnection.setAutoCommit(DruidPooledConnection.java:703)
	at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.openInputFormat(JdbcRowDataInputFormat.java:114)
	... 5 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 507,117 milliseconds ago.  The last packet sent successfully to the server was 507,120 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:981)
	at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2176)
	at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1964)
	at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3316)
	at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:463)
	at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3040)
	at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2288)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2681)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)
	at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962)
	at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:227)
	at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:208)
	... 4 more
Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 3 bytes before connection was unexpectedly lost.
	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2957)
	at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2159)
	... 15 more

这个异常任务是我根据flink1.9中的任务升级到1.13的,本地windows直接执行,成功,一在正式环境当中执行就报异常。。。

解决过程

第一阶段

	一开始我以为是mysql的一些配置问题,然后就在网上搜索,根据下面一些参数进行调整
以下这些配置项单位都是秒,在mysql命令行中可以使用show global variables like '变量名';可查询配置值。

connect_timeout:连接响应超时时间。服务器端在这个时间内如未连接成功,则会返回连接失败。

wait_timeout:连接空闲超时时间。与服务器端无交互状态的连接,直到被服务器端强制关闭而等待的时间。
可以认为是服务器端连接空闲的时间,空闲超过这个时间将自动关闭。

interactive_timeout :连接空闲超时时间。与服务器端无交互状态的连接,直到被服务器端强制关闭而等
待的时间。

    interactive_timeout和wait_timeoutu意义虽然相同,但是有使用对象有本质的区别。interactive_timeout
针对交互式连接(比如通过mysql客户端连接数据库),wait_timeout针对非交互式连接(比如一般在PHP中使用PDO
连接数据库,当然你可以设置CLIENT_INTERACTIVE选项来改变)。所谓的交互式连接,即在mysql_real_connect()
函数中使用了CLIENT_INTERACTIVE选项。

net_read_timeout :数据读取超时时间。在终止读之前,从一个连接获得数据而等待的时间秒数;当服务正在从
客户端读取数据时,net_read_timeout控制何时超时。即客户端执行数据读取,等待多少秒仍未执行成功时自动
断开连接。 

net_write_timeout:数据库写超时时间。和net_read_timeout意义类似,在终止写之前,等待多少秒把block
写到连接;当服务正在写数据到客户端时,net_write_timeout控制何时超时。

slave-net-timeout:从库延后同步的时间,当slave认为连接master的连接有问题时,就等待N秒,然后断开连
接,重新连接master

slave-net-timeout在主从同步时从库上起作用;connect_timeout:在获取连接阶段起作用;interactive_timeout
和wait_timeout:在连接空闲阶段起作用;net_read_timeout和net_write_timeout:则是在连接执行时起作用。

但是这些参数都没能把问题解决掉,期间也把flink-connector-jdbc中的jdbc连接替换成了从alibaba druid中取出,都不行。单纯的调整mysql的一些参数看来行不通了

第二阶段

flink web界面上可以看出每次出错的地方都是sourcescan那一步,单并行度执行查询全表数据。
而单纯的调整mysql参数行不通,我单表数据量也达到了6000多万条,就想着能不能改成并行查询。说干就干,我就开始debug源码,查看jdbc连接器的执行流程,然后尝试着修改源码。。。改着改着我就突然缓过劲儿来了,这么火的一款大数据流式处理框架会考虑不到这种情况吗?并且flink中文邮件组中也有人说这种异常已经修复过了。然后我就上官网仔细查看jdbc连接器的一些参数,然后就发现了
scan.partition.column: The column name used for partitioning the input.
scan.partition.num: The number of partitions.
scan.partition.lower-bound: The smallest value of the first partition.
scan.partition.upper-bound: The largest value of the last partition.
这是这是官网的具体描述。
然后我就把参数加到了代码中,执行发现确实查询并行度已经改变,但是执行速度还是非常慢,我看了下,查询传输1000万左右数据都耗费了近40分钟,并且我 iftop -P 监控任务机mysql的流量,发现基本速度0b了。。最后我忍不了了,就手动把任务停止了。

第三阶段

由于之前查看web任务界面发现数据传输非常慢,同时taskManager配置的task heap内存很快就要用完了。并且当task heap内存即将耗尽时,数据传输基本处于停滞状态。我就考虑增大task heap内存了,通过参考flink 1.10之改进的TaskManager内存模型与配置
调整 taskmanager.memory.network.fraction 和 taskmanager.memory.managed.fraction 来增大task heap内存,问题就突然解决了,好草率~~~

至此,问题得以解决

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-14 14:08:30  更:2021-08-14 14:09:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:55:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码