IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> datax自定义reader->kuduReader -> 正文阅读

[大数据]datax自定义reader->kuduReader

背景。

cdh集群升级到cdp集群,生产环境数据大多都是hive数据,少部分是kudu的数据,

hive的数据可以通过hadoop ditcp来转义

那么kudu的数据呢?因为kudu 的数据是存在服务器上的,并不是在hdfs的,所以我们需要一个工具读取cdh的kudu表,然后写到cdp的kudu表。

其实优秀的工具有很多,但是本着熟悉的关系还是选择了datax,自己写个kuduWriter。

 <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
            <!--
            <scope>system</scope>
            <systemPath>${basedir}/libs/kudu-client-1.10.0-cdh6.3.3.jar</systemPath> -->
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

kudu-client 不用说肯定要的, hadoop-common 是需要kerberos认证.

新建一个reader

第一步就是要构想这个kudureader的json以后怎么写?

举例 我们需要kudu做啥?

创建连接——>kuduMaster

获取表——>table

获取列——>column

原表数据过滤——> where

kerberos认证——> keytab princple

所以我们需要先编写一个

?

{
  "job": {
    "setting": {
      "speed": {
        "channel": 10
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "kudureader",
          "parameter": {
            "haveKerberos": "true",
            "kerberosKeytabFilePath": "/data//share/keytab/hive.keytab",
            "kerberosPrincipal": "hive@CDH.COM",
            "where": {
              "column": "id",
              "compare": "<=",
              "value": "13"
            },
            "columns": [
              "*"
            ],
            "connection": {
              "kuduMaster": "node02.data.com,node03.data.com,master.data.com",
              "table": "default.cc_test_kd3"
            }
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/data/cc_test/kuduwriter",
            "fileName": "result",
            "writeMode": "truncate",
            "dateFormat": "yyyy-MM-dd"
          }
        }
      }
    ]
  }
}

?connection 直接就是一个conf 不是mysql oracle那种可以多个连接有分库分表的情况,一般只有一个连接,哪个公司还有好几个集群....

"connection": { "kuduMaster": "node02.data.com,node03.data.com,master.data.com", "table": "default.cc_test_kd3" }

column 可以支持*,写*的时候会根据kudutable获取所有字段
"columns": ["*"]

where。因为kudu不是jdbc 直接拼sql就行。这个最难写,在于怎么给这三个字段取名。。

"where": { "column": "id", "compare": "<=", "value": "13" },

kerberos 这个就不用说,随便写下就好,没有的可以不写。

"haveKerberos": "true",

"kerberosKeytabFilePath": "/data//share/keytab/hive.keytab",

"kerberosPrincipal":"hive@CDH.COM",

其实我这里少些了很多 比如querySql 和splipk,这两个是为了把job分为多个task的,懒得写。总不能一次写好了后面就躺平把,总还要不断优化的。。。

其实写datax已经写的很好了,我们只需要

public class KuduReader extends Reader 

在其内部

public static class Job extends Reader.Job 
public static class Task extends Reader.Task

然后写几个方法就好。

其实先建议写 kudu的连接和query方法。

public  class KuduConstant {
    public final static String CONNECTION="connection";
    public final static String KUDU_MASTER="kuduMaster";
    public final static String TABLE="table";
    public final static String COLUMNS="columns";
    public final static String WHERE="where";
    public final static String COLUMN="column";
    public final static String COMPARE="compare";
    public final static String CONDITION="condition";
    public final static String TYPE="type";
    public final static String VALUE="value";


}

?

import com.google.common.collect.Lists;
import com.tencent.s2.dataingestion.common.element.Record;
import com.tencent.s2.dataingestion.common.element.StringColumn;
import com.tencent.s2.dataingestion.common.exception.DataXException;
import com.tencent.s2.dataingestion.common.plugin.RecordSender;
import com.tencent.s2.dataingestion.common.plugin.TaskPluginCollector;
import com.tencent.s2.dataingestion.common.spi.ErrorCode;
import com.tencent.s2.dataingestion.common.spi.Reader;
import com.tencent.s2.dataingestion.common.util.Configuration;
import com.tencent.s2.dataingestion.plugin.rdbms.util.DBUtilErrorCode;
import com.tencent.s2.dataingestion.plugin.rdbms.writer.Key;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.nio.file.Paths;
import java.util.List;

import static org.apache.kudu.Type.*;


//TODO writeProxy
public class KuduReader extends Reader {
//    private static Logger  logger = LoggerFactory.getLogger(KuduReader.class);

    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);
        private Configuration originalConfig = null;
        private String kerberosPrincipal;
        private String kerberosKeytabFilePath;
        private boolean haveKerberos;


        @Override
        public void init() {
            LOG.info("kudureader init ");
            this.originalConfig = super.getPluginJobConf();
            validate();
        }

        private void validate() {
            LOG.debug("After job init(), originalConfig now is:[\n{}\n]",
                    originalConfig.toJSON());
            this.haveKerberos = originalConfig.getBool(Key.HAVE_KERBEROS);
            this.kerberosKeytabFilePath = originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
            this.kerberosPrincipal = originalConfig.getString(Key.KERBEROS_PRINCIPAL);
            LOG.info("kerberosKeytabFilePath: {}, kerberosPrincipal: {}", kerberosKeytabFilePath, kerberosPrincipal);
            if (haveKerberos) {
                if (StringUtils.isNotBlank(this.kerberosPrincipal) && Paths.get(this.kerberosKeytabFilePath).toFile().exists()) {
                    this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
                    LOG.info("************ kerberosKeytabFilePath: {}, kerberosPrincipal: {} *************", kerberosKeytabFilePath, kerberosPrincipal);
                } else {
                    LOG.error("kerberosKeytabFilePath: {}, kerberosPrincipal: {}", kerberosKeytabFilePath, kerberosPrincipal);
                }
            }
        }

        private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
            System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
            if (StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
                UserGroupInformation.setConfiguration(new org.apache.hadoop.conf.Configuration());
                try {
                    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
                } catch (Exception e) {
                    String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
                            kerberosKeytabFilePath, kerberosPrincipal);
                    LOG.error(message);
                    throw DataXException.asDataXException(DBUtilErrorCode.NO_INSERT_PRIVILEGE, e);
                }
            }
        }
        // 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            LOG.info("kudu no need to split ");
            return Lists.newArrayList(originalConfig);
        }

        // 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)
        @Override
        public void post() {
            LOG.info("kudu reader post do nothing");
        }

        @Override
        public void destroy() {
            LOG.info("kudu reader destroy do nothing");
        }

    }

    public static class Task extends Reader.Task {
        private Configuration readerSliceConfig;
        private Logger LOG = LoggerFactory.getLogger(Task.class);
        KuduOperator kuduOperator;

        @Override
        public void init() {
            readerSliceConfig = super.getPluginJobConf();
            kuduOperator = new KuduOperator(readerSliceConfig);

        }

        @Override
        public void destroy() {
            kuduOperator.close();
        }

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.info("kuduTask start read ");
            TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
            kuduOperator.startRead(recordSender,taskPluginCollector);
        }
    }

    public static class KuduOperator {
        private Logger LOG = LoggerFactory.getLogger(KuduOperator.class);
        //从json里获取的kudu的配置
        private Configuration configuration;
        //操作kudu 肯定要一个client
        private KuduClient kuduClient;
        //操作的哪张表
        private KuduTable kuduTable;
        //开启的会话
        private KuduSession session;
        //连接kudu的必须
        private String kuduMaster;
        //查询kudu的scanner
        private KuduScanner.KuduScannerBuilder kuduScannerBuilder;
        //获取的表的schema
        private Schema tableSchema;
        //获取where的配置,其实写不写在这里都行,没啥区别
        private Configuration where;
        private String tableName;
        private List<String> columns;
        
        //根据构造器传入conf,此后的KuduOperator对象即可想干嘛干嘛
        public KuduOperator(Configuration configuration) {
           this.configuration=configuration;
           LOG.info("KuduOperator check param");
            //检查参数 其实不检查也行,看着专业点
            checkParam();
            //初始化连接
            init();
        }
        //检查必须的参数 比如 kuduMaster connection table column
        public void checkParam(){
            this.columns = configuration.getList(KuduConstant.COLUMNS, String.class);
            Configuration connectionInfo = configuration.getConfiguration(KuduConstant.CONNECTION);
            this.kuduMaster = connectionInfo.getNecessaryValue(KuduConstant.KUDU_MASTER,KuduReaderErrorCode.KUDU_PARAM_NECESSARY);
            this.tableName = connectionInfo.getNecessaryValue(KuduConstant.TABLE,KuduReaderErrorCode.KUDU_PARAM_NECESSARY);
            LOG.info("columns ={},where ={},kuduMaster={}, tableName={}",columns,where,kuduMaster,tableName);
        }
    
        //对开始创建各种连接
        public void init() {
            LOG.info("KuduOperator init ");
            kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build();
            try {
                session = kuduClient.newSession();
                //session set something
                kuduTable = kuduClient.openTable(tableName);
                tableSchema=kuduTable.getSchema();
                kuduScannerBuilder = kuduClient.newScannerBuilder(kuduTable);
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
        

        //最重要的query 我们写reader的目的就是query,recordSender是连接reader和writer的桥梁。
        public void startRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector)  {
            //处理where
            if (where!=null){
                handleWhere(kuduScannerBuilder);
            }
            //处理 column =*
            if ("*".equals(columns.get(0))) {
                    columns = getColumnNamesFromSchema(tableSchema);
                    LOG.info("columns=* ,默认为读所有列,columns={}",columns);
            }
            KuduScanner scanner = kuduScannerBuilder
                    .setProjectedColumnNames(columns)
                    .build();
            try {
                while (scanner.hasMoreRows()) {
                    RowResultIterator rowResults = scanner.nextRows();
                    while (rowResults.hasNext()) {
                        RowResult res = rowResults.next();
                        Record record = recordSender.createRecord();
                        for (String column : columns) {
                            //因为我们kudu比较简单,直接就用string代替其他类型了
                            record.addColumn(new StringColumn(String.valueOf(res.getObject(column))));
                        }
                        recordSender.sendToWriter(record);
                    }
                }
                scanner.close();
            } catch (KuduException e) {
                LOG.error("kudu read error ," +e.getMessage());
                e.printStackTrace();
            }
        }

        public List<String> getColumnNamesFromSchema(Schema schema) {
            List<ColumnSchema> columns = schema.getColumns();
            List<String> columnNames = Lists.newArrayList();
            for (ColumnSchema column : columns) {
                Type type = column.getType();
                System.out.println(column.getName() + "->" + type.getName());
                /**
                 * name->string
                 * age->int32
                 */
                columnNames.add(column.getName());
            }
            return columnNames;
        }

        public void close() {
            if (session != null) {
                try {
                    session.close();
                    LOG.info("session close success");
                } catch (KuduException e) {
                    e.printStackTrace();
                }
            }
            if (kuduClient != null) {
                try {
                    kuduClient.close();
                    LOG.info("kuduClient close success");
                } catch (KuduException e) {
                    e.printStackTrace();
                }
            }
        }

        //这里可以做多个 where过滤的 但是比较麻烦 考虑到 or and,有需要时再扩展
        //id >"1"
        public void  handleWhere(KuduScanner.KuduScannerBuilder kuduScannerBuilder) {
            this.where = configuration.getConfiguration(KuduConstant.WHERE);
            String column= where.getNecessaryValue(KuduConstant.COLUMN, KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
            String compare= where.getNecessaryValue(KuduConstant.COMPARE,KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
            String value = where.getNecessaryValue(KuduConstant.VALUE, KuduReaderErrorCode.KUDU_PARAM_NECESSARY).trim();
            ColumnSchema schemaColumn = tableSchema.getColumn(column);
            KuduPredicate.ComparisonOp comparisonOp ;
            switch (compare){
                case ">" :
                    comparisonOp = KuduPredicate.ComparisonOp.GREATER;
                    break;
                case ">=" :
                    comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL;
                    break;
                case "<" :
                    comparisonOp = KuduPredicate.ComparisonOp.LESS;
                    break;
                case "<=" :
                    comparisonOp = KuduPredicate.ComparisonOp.LESS_EQUAL;
                    break;
                case "=" :
                    comparisonOp = KuduPredicate.ComparisonOp.EQUAL;
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + compare);
            }
            Object row;
            switch (schemaColumn.getType()) {
                case INT8:
                    row = Byte.parseByte(value);;
                    break;
                case INT16:
                    row = Short.parseShort(value);
                    break;
                case INT32:
                case INT64:
                    row = Integer.parseInt(value);
                    break;
                case UNIXTIME_MICROS:
                    row = Long.parseLong(value);
                    break;
                case STRING:
                    row = value;
                    break;
                case DECIMAL:
                    row=new BigDecimal(value);
                    break;
                case BOOL:
                    row=Boolean.valueOf(value);
                    break;
                case DOUBLE:
                    row=Double.valueOf(value);
                    break;
                case FLOAT:
                     row=Float.valueOf(value);
                     break;
                default:
                    LOG.warn("got unknown type  for column '{}'-- ignoring this column",
                            schemaColumn);
                    throw new DataXException(KuduReaderErrorCode.KUDU_COLUMN_TYPE_NOT_SUPPORT,"暂不支持该类型"+schemaColumn);
            }
            LOG.info("schemaColumn={},comparisonOp={},row={}",schemaColumn,comparisonOp,row);
            KuduPredicate predicate = KuduPredicate.newComparisonPredicate(schemaColumn, comparisonOp, row);
            kuduScannerBuilder.addPredicate(predicate);
        }
    }
}
import com.tencent.s2.dataingestion.common.spi.ErrorCode;

public enum KuduReaderErrorCode implements ErrorCode {
    KUDU_PARAM_NECESSARY("KUDU_ErrCode-1","参数必须存在"),
    KUDU_COLUMN_TYPE_NOT_SUPPORT("KUDU_ErrCode-1","kudu列的类型暂不支持"),
    KUDU_ONLY_ONE_CONNECTION("KUDU_ErrCode-2","kuduReader只能配置一个kuduConnection");
    private final String code;
    private final String describe;

    KuduReaderErrorCode(String code, String describe) {
        this.code = code;
        this.describe = describe;
    }

    @Override
    public String getCode() {
        return this.code;
    }

    @Override
    public String getDescription() {
        return this.describe;
    }

    @Override
    public String toString() {
        return String.format("Code:[%s], Describe:[%s]. ", this.code,
                this.describe);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-04 00:01:46  更:2022-06-04 00:02:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:31:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码