背景。
cdh集群升级到cdp集群,生产环境数据大多都是hive数据,少部分是kudu的数据,
hive的数据可以通过hadoop ditcp来转义
那么kudu的数据呢?因为kudu 的数据是存在服务器上的,并不是在hdfs的,所以我们需要一个工具读取cdh的kudu表,然后写到cdp的kudu表。
其实优秀的工具有很多,但是本着熟悉的关系还是选择了datax,自己写个kuduWriter。
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
<!--
<scope>system</scope>
<systemPath>${basedir}/libs/kudu-client-1.10.0-cdh6.3.3.jar</systemPath> -->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
kudu-client 不用说肯定要的, hadoop-common 是需要kerberos认证.
新建一个reader
第一步就是要构想这个kudureader的json以后怎么写?
举例 我们需要kudu做啥?
创建连接——>kuduMaster
获取表——>table
获取列——>column
原表数据过滤——> where
kerberos认证——> keytab princple
所以我们需要先编写一个
?
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "kudureader",
"parameter": {
"haveKerberos": "true",
"kerberosKeytabFilePath": "/data//share/keytab/hive.keytab",
"kerberosPrincipal": "hive@CDH.COM",
"where": {
"column": "id",
"compare": "<=",
"value": "13"
},
"columns": [
"*"
],
"connection": {
"kuduMaster": "node02.data.com,node03.data.com,master.data.com",
"table": "default.cc_test_kd3"
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/data/cc_test/kuduwriter",
"fileName": "result",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}
}
]
}
}
?connection 直接就是一个conf 不是mysql oracle那种可以多个连接有分库分表的情况,一般只有一个连接,哪个公司还有好几个集群....
"connection": { "kuduMaster": "node02.data.com,node03.data.com,master.data.com", "table": "default.cc_test_kd3" }
column 可以支持*,写*的时候会根据kudutable获取所有字段 "columns": ["*"]
where。因为kudu不是jdbc 直接拼sql就行。这个最难写,在于怎么给这三个字段取名。。
"where": { "column": "id", "compare": "<=", "value": "13" },
kerberos 这个就不用说,随便写下就好,没有的可以不写。
"haveKerberos": "true",
"kerberosKeytabFilePath": "/data//share/keytab/hive.keytab",
"kerberosPrincipal":"hive@CDH.COM",
其实我这里少些了很多 比如querySql 和splipk,这两个是为了把job分为多个task的,懒得写。总不能一次写好了后面就躺平把,总还要不断优化的。。。
其实写datax已经写的很好了,我们只需要
public class KuduReader extends Reader
在其内部
public static class Job extends Reader.Job
public static class Task extends Reader.Task
然后写几个方法就好。
其实先建议写 kudu的连接和query方法。
public class KuduConstant {
public final static String CONNECTION="connection";
public final static String KUDU_MASTER="kuduMaster";
public final static String TABLE="table";
public final static String COLUMNS="columns";
public final static String WHERE="where";
public final static String COLUMN="column";
public final static String COMPARE="compare";
public final static String CONDITION="condition";
public final static String TYPE="type";
public final static String VALUE="value";
}
?
import com.google.common.collect.Lists;
import com.tencent.s2.dataingestion.common.element.Record;
import com.tencent.s2.dataingestion.common.element.StringColumn;
import com.tencent.s2.dataingestion.common.exception.DataXException;
import com.tencent.s2.dataingestion.common.plugin.RecordSender;
import com.tencent.s2.dataingestion.common.plugin.TaskPluginCollector;
import com.tencent.s2.dataingestion.common.spi.ErrorCode;
import com.tencent.s2.dataingestion.common.spi.Reader;
import com.tencent.s2.dataingestion.common.util.Configuration;
import com.tencent.s2.dataingestion.plugin.rdbms.util.DBUtilErrorCode;
import com.tencent.s2.dataingestion.plugin.rdbms.writer.Key;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.nio.file.Paths;
import java.util.List;
import static org.apache.kudu.Type.*;
//TODO writeProxy
public class KuduReader extends Reader {
// private static Logger logger = LoggerFactory.getLogger(KuduReader.class);
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private Configuration originalConfig = null;
private String kerberosPrincipal;
private String kerberosKeytabFilePath;
private boolean haveKerberos;
@Override
public void init() {
LOG.info("kudureader init ");
this.originalConfig = super.getPluginJobConf();
validate();
}
private void validate() {
LOG.debug("After job init(), originalConfig now is:[\n{}\n]",
originalConfig.toJSON());
this.haveKerberos = originalConfig.getBool(Key.HAVE_KERBEROS);
this.kerberosKeytabFilePath = originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = originalConfig.getString(Key.KERBEROS_PRINCIPAL);
LOG.info("kerberosKeytabFilePath: {}, kerberosPrincipal: {}", kerberosKeytabFilePath, kerberosPrincipal);
if (haveKerberos) {
if (StringUtils.isNotBlank(this.kerberosPrincipal) && Paths.get(this.kerberosKeytabFilePath).toFile().exists()) {
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
LOG.info("************ kerberosKeytabFilePath: {}, kerberosPrincipal: {} *************", kerberosKeytabFilePath, kerberosPrincipal);
} else {
LOG.error("kerberosKeytabFilePath: {}, kerberosPrincipal: {}", kerberosKeytabFilePath, kerberosPrincipal);
}
}
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
if (StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(new org.apache.hadoop.conf.Configuration());
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
LOG.error(message);
throw DataXException.asDataXException(DBUtilErrorCode.NO_INSERT_PRIVILEGE, e);
}
}
}
// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
@Override
public List<Configuration> split(int mandatoryNumber) {
LOG.info("kudu no need to split ");
return Lists.newArrayList(originalConfig);
}
// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)
@Override
public void post() {
LOG.info("kudu reader post do nothing");
}
@Override
public void destroy() {
LOG.info("kudu reader destroy do nothing");
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfig;
private Logger LOG = LoggerFactory.getLogger(Task.class);
KuduOperator kuduOperator;
@Override
public void init() {
readerSliceConfig = super.getPluginJobConf();
kuduOperator = new KuduOperator(readerSliceConfig);
}
@Override
public void destroy() {
kuduOperator.close();
}
@Override
public void startRead(RecordSender recordSender) {
LOG.info("kuduTask start read ");
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
kuduOperator.startRead(recordSender,taskPluginCollector);
}
}
public static class KuduOperator {
private Logger LOG = LoggerFactory.getLogger(KuduOperator.class);
//从json里获取的kudu的配置
private Configuration configuration;
//操作kudu 肯定要一个client
private KuduClient kuduClient;
//操作的哪张表
private KuduTable kuduTable;
//开启的会话
private KuduSession session;
//连接kudu的必须
private String kuduMaster;
//查询kudu的scanner
private KuduScanner.KuduScannerBuilder kuduScannerBuilder;
//获取的表的schema
private Schema tableSchema;
//获取where的配置,其实写不写在这里都行,没啥区别
private Configuration where;
private String tableName;
private List<String> columns;
//根据构造器传入conf,此后的KuduOperator对象即可想干嘛干嘛
public KuduOperator(Configuration configuration) {
this.configuration=configuration;
LOG.info("KuduOperator check param");
//检查参数 其实不检查也行,看着专业点
checkParam();
//初始化连接
init();
}
//检查必须的参数 比如 kuduMaster connection table column
public void checkParam(){
this.columns = configuration.getList(KuduConstant.COLUMNS, String.class);
Configuration connectionInfo = configuration.getConfiguration(KuduConstant.CONNECTION);
this.kuduMaster = connectionInfo.getNecessaryValue(KuduConstant.KUDU_MASTER,KuduReaderErrorCode.KUDU_PARAM_NECESSARY);
this.tableName = connectionInfo.getNecessaryValue(KuduConstant.TABLE,KuduReaderErrorCode.KUDU_PARAM_NECESSARY);
LOG.info("columns ={},where ={},kuduMaster={}, tableName={}",columns,where,kuduMaster,tableName);
}
//对开始创建各种连接
public void init() {
LOG.info("KuduOperator init ");
kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build();
try {
session = kuduClient.newSession();
//session set something
kuduTable = kuduClient.openTable(tableName);
tableSchema=kuduTable.getSchema();
kuduScannerBuilder = kuduClient.newScannerBuilder(kuduTable);
} catch (KuduException e) {
e.printStackTrace();
}
}
//最重要的query 我们写reader的目的就是query,recordSender是连接reader和writer的桥梁。
public void startRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
//处理where
if (where!=null){
handleWhere(kuduScannerBuilder);
}
//处理 column =*
if ("*".equals(columns.get(0))) {
columns = getColumnNamesFromSchema(tableSchema);
LOG.info("columns=* ,默认为读所有列,columns={}",columns);
}
KuduScanner scanner = kuduScannerBuilder
.setProjectedColumnNames(columns)
.build();
try {
while (scanner.hasMoreRows()) {
RowResultIterator rowResults = scanner.nextRows();
while (rowResults.hasNext()) {
RowResult res = rowResults.next();
Record record = recordSender.createRecord();
for (String column : columns) {
//因为我们kudu比较简单,直接就用string代替其他类型了
record.addColumn(new StringColumn(String.valueOf(res.getObject(column))));
}
recordSender.sendToWriter(record);
}
}
scanner.close();
} catch (KuduException e) {
LOG.error("kudu read error ," +e.getMessage());
e.printStackTrace();
}
}
public List<String> getColumnNamesFromSchema(Schema schema) {
List<ColumnSchema> columns = schema.getColumns();
List<String> columnNames = Lists.newArrayList();
for (ColumnSchema column : columns) {
Type type = column.getType();
System.out.println(column.getName() + "->" + type.getName());
/**
* name->string
* age->int32
*/
columnNames.add(column.getName());
}
return columnNames;
}
public void close() {
if (session != null) {
try {
session.close();
LOG.info("session close success");
} catch (KuduException e) {
e.printStackTrace();
}
}
if (kuduClient != null) {
try {
kuduClient.close();
LOG.info("kuduClient close success");
} catch (KuduException e) {
e.printStackTrace();
}
}
}
//这里可以做多个 where过滤的 但是比较麻烦 考虑到 or and,有需要时再扩展
//id >"1"
public void handleWhere(KuduScanner.KuduScannerBuilder kuduScannerBuilder) {
this.where = configuration.getConfiguration(KuduConstant.WHERE);
String column= where.getNecessaryValue(KuduConstant.COLUMN, KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
String compare= where.getNecessaryValue(KuduConstant.COMPARE,KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
String value = where.getNecessaryValue(KuduConstant.VALUE, KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
ColumnSchema schemaColumn = tableSchema.getColumn(column);
KuduPredicate.ComparisonOp comparisonOp ;
switch (compare){
case ">" :
comparisonOp = KuduPredicate.ComparisonOp.GREATER;
break;
case ">=" :
comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL;
break;
case "<" :
comparisonOp = KuduPredicate.ComparisonOp.LESS;
break;
case "<=" :
comparisonOp = KuduPredicate.ComparisonOp.LESS_EQUAL;
break;
case "=" :
comparisonOp = KuduPredicate.ComparisonOp.EQUAL;
break;
default:
throw new IllegalStateException("Unexpected value: " + compare);
}
Object row;
switch (schemaColumn.getType()) {
case INT8:
row = Byte.parseByte(value);;
break;
case INT16:
row = Short.parseShort(value);
break;
case INT32:
case INT64:
row = Integer.parseInt(value);
break;
case UNIXTIME_MICROS:
row = Long.parseLong(value);
break;
case STRING:
row = value;
break;
case DECIMAL:
row=new BigDecimal(value);
break;
case BOOL:
row=Boolean.valueOf(value);
break;
case DOUBLE:
row=Double.valueOf(value);
break;
case FLOAT:
row=Float.valueOf(value);
break;
default:
LOG.warn("got unknown type for column '{}'-- ignoring this column",
schemaColumn);
throw new DataXException(KuduReaderErrorCode.KUDU_COLUMN_TYPE_NOT_SUPPORT,"暂不支持该类型"+schemaColumn);
}
LOG.info("schemaColumn={},comparisonOp={},row={}",schemaColumn,comparisonOp,row);
KuduPredicate predicate = KuduPredicate.newComparisonPredicate(schemaColumn, comparisonOp, row);
kuduScannerBuilder.addPredicate(predicate);
}
}
}
import com.tencent.s2.dataingestion.common.spi.ErrorCode;
public enum KuduReaderErrorCode implements ErrorCode {
KUDU_PARAM_NECESSARY("KUDU_ErrCode-1","参数必须存在"),
KUDU_COLUMN_TYPE_NOT_SUPPORT("KUDU_ErrCode-1","kudu列的类型暂不支持"),
KUDU_ONLY_ONE_CONNECTION("KUDU_ErrCode-2","kuduReader只能配置一个kuduConnection");
private final String code;
private final String describe;
KuduReaderErrorCode(String code, String describe) {
this.code = code;
this.describe = describe;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.describe;
}
@Override
public String toString() {
return String.format("Code:[%s], Describe:[%s]. ", this.code,
this.describe);
}
}
|