1、Hue创建Oozie工作流
(1)local 模式
登录Hue,创建Oozie工作流,点击【Workspace】按钮,新页面打开,上传jar包至lib目录中. 进入WorkSpace,上传JAR包至lib目录: 添加Spark2任务: 选择jar包所在目录和jar包名称: 填写MainClass及添加JAR包文件: 设置使用Spark2,否则默认使用的Spark1: 保存Oozie,然后点击提交:
成功
(2)yarn client 模式
进入 Workspace,进入 lib 目录,并上传 jar 包,拖拽 Spark Program,填写业务主类名称和 配置参数: 点击小齿轮,查看其他参数: 保存配置,提交运行: 其中Hue自动生成的workflow配置文件内容如下:
<workflow-app name="wf_spark2_client" xmlns="uri:oozie:workflow:0.5">
<start to="spark-08c4"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-08c4">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<master>yarn</master>
<mode>client</mode>
<name>YarnSparkPi</name>
<class>org.apache.spark.examples.SparkPi</class>
<jar>spark-examples_2.11-2.2.0.jar</jar>
<spark-opts>--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01:18080 -- conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01:8020/spark/jars/*</spark-opts>
<file>/user/root/oozie_works/examples/apps/hue-oozie-1646279075.31/lib/spark-examples_2.11-2.2.0.jar#spark-examples_2.11-2.2.0.jar</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
(3)yarn cluster 模式
按照上述yarn client模式使用hue构建workflow,设置应用运行为yarn-cluster模式,提交运 行。 运行成功截图:
(4)Schedule 调度
选择进入Scheduler页面,基于Workflow构建调度任务,可以设置时间调度。 设置名称和描述,选择Workflow及定时执行表达式(注意时区选择):
2、Oozie Java Client API
Apache Oozie是一个工作流调度系统,具有如下特性优势:
1)、工作流的调度是DAG(有向无环图)-Directed Acyclical Graphs
2)、Coordinator job可以通过时间和数据集的可用性触发
3)、集成了Hadoop生态系统的其它任务,如mr,pig,hive,sqoop,distcp
4)、可扩展:一个Oozie就是一个mr程序,但是仅仅是map,没有reduce
5)、可靠性:任务失败后的重试
(1) Workflow Submit
将SparkPi圆周率程序提交到YARN上以cluster DeployMode运行,相关配置文件内容如下:
package com.yyds.tags.oozie;
public class OozieConstant {
static String HDFSROOTPATH = "hdfs://192.168.42.7:8020";
static String OOZIE_URL = "http://192.168.42.7:11000/oozie/";
static String jobTracker = "192.168.42.7:8032";
}
package com.yyds.tags.oozie;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import java.util.Properties;
public class OozieWorkflowSubmit {
public static void main(String[] args) throws OozieClientException, InterruptedException {
String OOZIE_URL = OozieConstant.OOZIE_URL;
OozieClient oozieClient = new OozieClient(OOZIE_URL);
Properties jobConf = oozieClient.createConfiguration();
jobConf.setProperty("oozie.use.system.libpath", "true");
jobConf.setProperty("user.name", "root");
jobConf.setProperty("oozie.libpath", OozieConstant.HDFSROOTPATH + "/user/root/share/lib/lib_20190723215106/spark2");
jobConf.setProperty("nameNode", OozieConstant.HDFSROOTPATH);
jobConf.setProperty("jobTracker", OozieConstant.jobTracker);
jobConf.setProperty("queueName", "default");
jobConf.setProperty("master", "yarn");
jobConf.setProperty("mode", "client");
jobConf.setProperty("sparkOptions", " --driver-memory 512m " + "--executor-memory 512m " + "--num-executors 1 " + "--executor-cores 1 " + "--conf spark.yarn.historyServer.address=http://192.168.42.7:18080 " + "--conf spark.eventLog.enabled=true " + "--conf spark.eventLog.dir=hdfs://192.168.42.7:8020/spark/eventLogs " + "--conf spark.yarn.jars=hdfs://192.168.42.7:8020/spark/jars/*");
jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi");
jobConf.setProperty("appName", "SparkExamplePi");
jobConf.setProperty("jarPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron-yarn_pi/lib/spark-examples_2.11-2.2.0.jar");
jobConf.setProperty("appParam", "10");
jobConf.setProperty(OozieClient.APP_PATH, OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi/workflow.xml");
// TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID
String jobId = oozieClient.run(jobConf);
System.out.println("JobId = " + jobId);
// TODO: 4. 依据JobID获取转态信息
while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
System.out.println("Workflow job running ...");
Thread.sleep(10 * 1000);
}
System.out.println("Workflow job completed ...");
}
}
注意,将Spark Application程序依赖包及workflow.xml文件上传到HDFS目录中
其中workflow文件内容为:
<workflow-app name="wf_spark2_client" xmlns="uri:oozie:workflow:0.5">
<start to="spark-08c4"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-08c4">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<master>yarn</master>
<mode>client</mode>
<name>YarnSparkPi</name>
<class>org.apache.spark.examples.SparkPi</class>
<jar>spark-examples_2.11-2.2.0.jar</jar>
<spark-opts>--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01:18080 -- conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01:8020/spark/jars/*</spark-opts>
<file>/user/root/oozie_works/examples/apps/hue-oozie-1646279075.31/lib/spark-examples_2.11-2.2.0.jar#spark-examples_2.11-2.2.0.jar</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
(2) Coordinator Submit
修改上述代码,添加定时调度时间设置及执行Coordinator配置文件,提交执行即可,具体 如下:
package com.yyds.tags.oozie;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import java.util.Properties;
public class OozieCoordinatorSubmit {
public static void main(String[] args) throws OozieClientException, InterruptedException {
String OOZIE_URL = OozieConstant.OOZIE_URL;
OozieClient oozieClient = new OozieClient(OOZIE_URL);
Properties jobConf = oozieClient.createConfiguration();
jobConf.setProperty("oozie.use.system.libpath", "true");
jobConf.setProperty("user.name", "root");
jobConf.setProperty("oozie.libpath", OozieConstant.HDFSROOTPATH + "/user/root/share/lib/lib_20190723215106/spark2");
jobConf.setProperty("nameNode", OozieConstant.HDFSROOTPATH);
jobConf.setProperty("jobTracker", "192.168.42.7:8032");
jobConf.setProperty("queueName", "default");
jobConf.setProperty("master", "yarn");
jobConf.setProperty("mode", "client");
jobConf.setProperty("sparkOptions", " --driver-memory 512m " + "--executor-memory 512m " + "--num-executors 1 " + "--executor-cores 1 " + "--conf spark.yarn.historyServer.address=http://192.168.42.7:18080 " + "--conf spark.eventLog.enabled=true " + "--conf spark.eventLog.dir=hdfs://192.168.42.7:8020/spark/eventLogs " + "--conf spark.yarn.jars=hdfs://192.168.42.7:8020/spark/jars/*");
jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi");
jobConf.setProperty("appName", "SparkExamplePi");
jobConf.setProperty("jarPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron-yarn_pi/lib/spark-examples_2.11-2.2.0.jar");
jobConf.setProperty("appParam", "10");
jobConf.setProperty("start", "2022-03-05T17:42Z");
jobConf.setProperty("freq", "0/3 * * * *");
jobConf.setProperty("end", "2022-06-01T17:50Z");
// 2.5. Oozie Workflow 参数
jobConf.setProperty("appPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi");
jobConf.setProperty(OozieClient.COORDINATOR_APP_PATH, OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi/coordinator.xml");
// TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID
String jobId = oozieClient.run(jobConf);
System.out.println("JobId = " + jobId);
// TODO: 4. 依据JobID获取转态信息
while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
System.out.println("Workflow job running ...");
Thread.sleep(10 * 1000);
}
System.out.println("Workflow job completed ...");
}
}
Coordinator配置文件:
<coordinator-app name="schedule_yarn_pi"
frequency="3 * * * *"
start="${start_date}" end="${end_date}" timezone="Asia/Shanghai"
xmlns="uri:oozie:coordinator:0.2"
>
<controls>
<execution>FIFO</execution>
</controls>
<action>
<workflow>
<app-path>${wf_application_path}</app-path>
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>True</value>
</property>
<property>
<name>start_date</name>
<value>${start_date}</value>
</property>
<property>
<name>end_date</name>
<value>${end_date}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
|