上一篇文章,提到通过通过 rest api的方式获取yarn app的相关信息,这一篇文章就讲一下怎么通过 yarn rest api 向yarn集群提交应用。 文章分为两个部分:
- 使用场景
- 使用方式及注意事项
1. 使用场景
通常情况下,我们是直接在集群上,使用命令、集成的调度工具来提交任务的,例如提交spark任务到集群。但如果是需要通过rest api 的方式向yarn 集群提交任务,大体上有以下情况:
- 自研平台和现有大数据平台集成
- 其他的自动化工具生成的任务
2. 使用方式及注意事项
参照官方文档 的说明,使用 rest api 向yarn 提交任务流程如下:
向yarn集群申请新的app_id -> 上传需要提交的应用文件到hdfs上 -> 设置rest api 的请求参数,提交应用到yarn集群
下面对各阶段进行部分说明。
2.1 向yarn集群申请新的app_id
正式向yarn集群提交应用之前,需要向申请一个app_id。申请方式如下:
- POST http://rm-http-address:port/ws/v1/cluster/apps/new-application
- 返回参数
{
"application-id":"application_1404198295326_0003",
"maximum-resource-capability":
{
"memory":8192,
"vCores":32
}
}
2.2 上传需要提交的应用文件到hdfs上
rest api 向 yarn 集群提交应用,必须依赖可执行文件,不能直接执行一个命令。这个限制很离谱呀。。。 并且,只能通过hdfs接口获取上传的可执行文件的文件大小【length】、最后的修改时间等信息【modificationTime】,这,更离谱了,,, 使用hdfs rest api 查询上传的文件信息方式如下:
- GET https://129.211.130.233:30002/webhdfs/v1/user/survey/sqoop_demo.sh?op=GETFILESTATUS
- 返回参数
{
"FileStatus": {
"accessTime": 1629362470581,
"blockSize": 134217728,
"childrenNum": 0,
"fileId": 2771229,
"group": "hadoop",
"length": 600,
"modificationTime": 1629362470665,
"owner": "hadoop",
"pathSuffix": "",
"permission": "644",
"replication": 1,
"storagePolicy": 0,
"type": "FILE"
}
}
2.3 设置rest api 的请求参数,提交应用到yarn集群
rest api 请求时,需要的参数非常多,这里就不一一列出来,详见说明。
两点注意事项:
- POST http://rm-http-address:port/ws/v1/cluster/apps
- 提交时,必须要有安全认证。不然会报错
Please note that in order to submit an app, you must have an authentication filter
setup for the HTTP interface. The functionality requires that a username is set in
the HttpServletRequest. If no filter is setup, the response will be an
“UNAUTHORIZED” response.
{
"application-id":"application_1624519467135_7577",
"application-name":"test_yarn_api",
"am-container-spec":
{
"local-resources":
{
"entry":
[
{
"key":"sqoop_demo.sh",
"value":
{
"resource":"hdfs://10.20.10.4:4007/user/survey/sqoop_demo.sh",
"type":"FILE",
"visibility":"APPLICATION",
"size": 600,
"timestamp": 1629362470665
}
}
]
},
"commands":
{
"command":"bash sqoop_demo.sh 1><LOG_DIR>/WC.stdout 2><LOG_DIR>/WC.stderr"
}
},
"max-app-attempts":1,
"application-type":"MAPREDUCE"
}
提交完应用之后,可以去yarn rs上去看应用是否提交成功,或使用 yarn rest api 查询应用状态:
- GET http://rm-http-address:port/ws/v1/cluster/apps/{appid}
- 返回参数
{
"app": {
"id": "application_1624519467135_7571",
"user": "hadoop",
"name": "test_yarn_api",
"queue": "root.default",
"state": "FAILED",
"finalStatus": "FAILED",
"progress": 0.0,
"trackingUI": "History",
"trackingUrl": "http://xxxx:5004/cluster/app/application_1624519467135_7571",
"diagnostics": "Application application_1624519467135_7571 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1624519467135_7571_000001 exited with exitCode: -1000\nFailing this attempt.Diagnostics: File does not exist: hdfs://10.20.10.4:4007/user/hadoop/NAMENODE/user/survey/sqoop_demo.sh\njava.io.FileNotFoundException: File does not exist: hdfs://10.20.10.4:4007/user/hadoop/NAMENODE/user/survey/sqoop_demo.sh\n\tat org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1444)\n\tat org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437)\n\tat org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)\n\tat org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1452)\n\tat org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)\n\tat org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)\n\tat org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)\n\tat org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)\n\tat java.security.AccessController.doPrivileged(Native Method)\n\tat javax.security.auth.Subject.doAs(Subject.java:422)\n\tat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)\n\tat org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)\n\tat org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\nFor more detailed output, check the application tracking page: http://10.20.10.4:5004/cluster/app/application_1624519467135_7571 Then click on links to logs of each attempt.\n. Failing the application.",
"clusterId": 1624519467135,
"applicationType": "MAPREDUCE",
"applicationTags": "",
"priority": 0,
"startedTime": 1629362098025,
"finishedTime": 1629362098574,
"elapsedTime": 549,
"amContainerLogs": "http://xxxx:5008/node/containerlogs/container_e09_1624519467135_7571_01_000001/hadoop",
"amHostHttpAddress": "xxxx:5008",
"allocatedMB": -1,
"allocatedVCores": -1,
"runningContainers": -1,
"memorySeconds": 35,
"vcoreSeconds": 0,
"queueUsagePercentage": 0.0,
"clusterUsagePercentage": 0.0,
"preemptedResourceMB": 0,
"preemptedResourceVCores": 0,
"numNonAMContainerPreempted": 0,
"numAMContainerPreempted": 0,
"preemptedMemorySeconds": 0,
"preemptedVcoreSeconds": 0,
"logAggregationStatus": "SUCCEEDED",
"unmanagedApplication": false,
"appNodeLabelExpression": "",
"amNodeLabelExpression": ""
}
}
个人理解
yarn 提供的rest api,使用时,限制颇多。整体使用下来,给人的体验比较差。比如必须先申请app_id再提交任务、必须依赖可执行文件等等。 如果确实有这种使用场景,或许以下的方式更为合理?
- 自研平台或自动化工具直接对接大数据集群的调度系统,由调度系统控制任务的提交。
这是因为现在社区里的调度系统提供的restful api一般都比较健全,体验也比较友好,例如dolphinscheduler。 - 使用java直接调用linux应用脚本或命令,向大数据集群提交任务。可以使用
ProcessBuilder 或jsch 。
|