看本文章前请先看其他任务节点和数据源以及SPI部分的具体实现源码,方便理解,否则无效果
打开源码,可看到任务节点中无hive,如有需要,我们自己开发。
1、task-plugin下创建hive的maven 2、Pom 参考其他的任务节点,接下来继续开发task对应的channel、factory、task、parameters。
3、创建Task通道工厂HiveClientTaskChannelFactory
首先我们需要创建任务服务的工厂,其主要作用是帮助构建 TaskChannel 以及 TaskPlugin 参数,同时给出该任务的唯一标识,ChannelFactory 在 Apache DolphinScheduler 的 Task 服务组中,其作用属于是在任务组中的承上启下,交互前后端以及帮助 Worker 构建 TaskChannel。
package org.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.params.checkbox.CheckboxParam;
import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.ArrayList;
import java.util.List;
public class HiveClientTaskChannelFactory implements TaskChannelFactory {
@Override
public String getName() {
return "HIVE CLIENT";
}
@Override
public List<PluginParams> getParams() {
List<PluginParams> pluginParams = new ArrayList<>();
InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
PluginParams runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG")
.addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false))
.addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false))
.build();
PluginParams build = CheckboxParam.newBuilder("Hive SQL", "Test HiveSQL")
.setDisplay(true)
.setValue("-- author: \n --desc:")
.build();
pluginParams.add(nodeName);
pluginParams.add(runFlag);
pluginParams.add(build);
return pluginParams;
}
@Override
public TaskChannel create() {
return new HiveClientTaskChannel();
}
}
4、创建channel
有了工厂之后,我们会根据工厂创建出 TaskChannel,TaskChannel 包含如下两个方法,一个是取消,一个是创建,目前不需要关注取消,主要关注创建任务。
package org.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class HiveClientTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
return new HiveClientTask(taskRequest);
}
}
5、创建parameters
通过 TaskChannel 我们得到了可执行的物理 Task,但是我们需要给当前 Task 添加相应的实现,才能够让Apache DolphinScheduler 去执行你的任务,首先在编写 Task 之前我们需要先了解一下 Task 之间的关系: 通过上图我们可以看到,基于 Yarn 执行任务的 Task 都会去继承 AbstractYarnTask,不需要经过 Yarn 执行的都会去直接继承 AbstractTaskExecutor,主要是包含一个 AppID,以及 CanalApplication setMainJar 之类的方法,想知道的小伙伴可以自己去深入研究一下,如上可知我们实现的 HiveClient 就需要继承 AbstractYarnTask,在构建 Task 之前,我们需要构建一下适配 HiveClient 的 Parameters 对象用来反序列化JsonParam。
package org.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import java.util.List;
public class HiveClientParameters extends AbstractParameters {
private String sql;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
@Override
public boolean checkParameters() {
return sql != null;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return null;
}
}
6、创建Task 实现了 Parameters 对象之后,我们具体实现 Task,例子中的实现比较简单,就是将用户的参数写入到文件中,通过 Hive -f 去执行任务。
package org.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class HiveClientTask extends AbstractYarnTask {
private HiveClientParameters hiveClientParameters;
private final TaskRequest taskExecutionContext;
public HiveClientTask(TaskRequest taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
}
@Override
public void init() {
logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);
if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {
throw new RuntimeException("hive client task params is not valid");
}
}
@Override
protected String buildCommand() {
String filePath = getFilePath();
if (writeExecutionContentToFile(filePath)) {
return "hive -f " + filePath;
}
return null;
}
private String getFilePath() {
return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
}
@Override
protected void setMainJarName() {
}
private boolean writeExecutionContentToFile(String filePath) {
Path path = Paths.get(filePath);
try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
writer.write(this.hiveClientParameters.getSql());
logger.info("file:" + filePath + "write success.");
return true;
} catch (IOException e) {
logger.error("file:" + filePath + "write failed.please path auth.");
e.printStackTrace();
return false;
}
}
@Override
public AbstractParameters getParameters() {
return this.hiveClientParameters;
}
}
到这里,四个关键的类就已经创建好了 将项目通过maven打包后,运行,查看worker的日志,可看到有相应hive plugin日志
参考资料 https://mp.weixin.qq.com/s/nmbazSl-UW4SMivbS6E_6Q
|