Structured Streaming任务使用foreach自定义sink写es,任务运行二三十分钟就挂掉。报错日志也没有报我的代码哪行抛的异常。
在测试环境,任务数据量很小,一分钟也没有几条数据,本来以为不会是内存的问题,看spark ui的executor模块,显示每个executor的内存使用也是很小。
后来详细看任务日志,发下些问题
21/08/24 17:33:56 org.apache.spark.internal.Logging.logError(Logging.scala:94) ERROR Utils: Aborting task
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:780)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:316)
at org.apache.hadoop.hdfs.DFSClient.primitiveCreate(DFSClient.java:1271)
at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:105)
at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:60)
at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:605)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:696)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:692)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:698)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.deltaFileStream$lzycompute(HDFSBackedStateStoreProvider.scala:95)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.deltaFileStream(HDFSBackedStateStoreProvider.scala:95)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.compressedStream$lzycompute(HDFSBackedStateStoreProvider.scala:96)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.compressedStream(HDFSBackedStateStoreProvider.scala:96)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$6(FlatMapGroupsWithStateExec.scala:141)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
“java.lang.OutOfMemoryError: unable to create new native thread”报的的oom,导致不能创建线程
这个异常在网上也搜了下,有人说是ulimit文件打开数限制什么的,后来看配置,也不是该问题。
后来在跑任务的时候,在计算节点上free看内存使用情况,就看内存是一直增加的(之前在spark ui看内存使用情况,使用了很少,这也是造成了些干扰)
再然后注释了写es那块就正常了,确定出是sink写es那块导致的内存问题
后来发现是代码在close RestHighLevelClient的时候,有个漏洞,并不是每次都会close
最终将RestHighLevelClient close后,程序回复正常
|