IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Datax-Web之reader和writer源码分析 -> 正文阅读

[大数据]Datax-Web之reader和writer源码分析

一、读写(reader和writer)组件共祖先:BaseDataxPlugin
reader和writer父类声明如下:
public abstract class BaseWriterPlugin extends BaseDataxPlugin
public abstract class BaseReaderPlugin extends BaseDataxPlugin

BaseReaderPlugin又是其他具体reader类的基类
BaseWriterPlugin又是其他具体writer类的基类

关系如下:
BaseDataxPlugin
? ? BaseReaderPlugin
? ? ? ? MysqlReader
? ? ? ? OracleReader
? ? ? ? 等等
? ? BaseWriterPlugin
? ? ? ? HiveWriter
? ? ? ? MysqlWriter
二、 基类分析,即parameter参数分析
{
? "job": {
? ? "content": [
? ? ? {
? ? ? ? "reader": {
? ? ? ? ? "name": "mysqlreader",
? ? ? ? ? "parameter": {
? ? ? ? ? ? "column": ["id_", "topic_id_", "speak_id_", "file_id_"],
? ? ? ? ? ? "connection": [
? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? "jdbcUrl": ["jdbc:mysql://*.*.*.*:3306/a?useUnicode=true&characterEncoding=UTF-8"],
? ? ? ? ? ? ? ? "table": ["a"]
? ? ? ? ? ? ? }
? ? ? ? ? ? ],
? ? ? ? ? ? "password": "*****",
? ? ? ? ? ? "username": "a",
? ? ? ? ? ? "where": "",
? ? ? ? ? ? "session": ["set names utf8mb4"]

? ? ? ? ? }
? ? ? ? },
? ? ? ? "writer": {
? ? ? ? ? "name": "mysqlwriter",
? ? ? ? ? "parameter": {
? ? ? ? ? ? "column": ["id_", "topic_id_", "speak_id_", "file_id_"],
? ? ? ? ? ? "connection": [
? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/b?useUnicode=true&characterEncoding=UTF-8",
? ? ? ? ? ? ? ? "table": ["b"]
? ? ? ? ? ? ? }
? ? ? ? ? ? ],
? ? ? ? ? ? "password": "****",
? ? ? ? ? ? "username": "b",
? ? ? ? ? ? "where": "",
? ? ? ? ? ? "session": ["set names utf8mb4"]

? ? ? ? ? }
? ? ? ? }
? ? ? }
? ? ],
? ? "setting": {
? ? ? "speed": {
? ? ? ? "channel": "5"
?}
? ? }
? }
}
1)BaseReaderPlugin
? ? //关系型数据库参数构造
? ? @Override
? ? public Map<String, Object> build(DataxRdbmsPojo plugin) {
? ? ? ? //构建
? ? ? ? Map<String, Object> readerObj = Maps.newLinkedHashMap();
? ? ? ? readerObj.put("name", getName());
? ? ? ? Map<String, Object> parameterObj = Maps.newLinkedHashMap();
? ? ? ? Map<String, Object> connectionObj = Maps.newLinkedHashMap();

? ? ? ? JobDatasource jobDatasource = plugin.getJobDatasource();
? ? ? ? parameterObj.put("username", jobDatasource.getJdbcUsername());
? ? ? ? parameterObj.put("password", jobDatasource.getJdbcPassword());

? ? ? ? //判断是否是 querySql ,说明querySql优先级比column配置高
? ? ? ? if (StrUtil.isNotBlank(plugin.getQuerySql())) {
? ? ? ? ? ? connectionObj.put("querySql", ImmutableList.of(plugin.getQuerySql()));
? ? ? ? } else {
? ? ? ? ? ? parameterObj.put("column", plugin.getRdbmsColumns());
? ? ? ? ? ? //判断是否有where
? ? ? ? ? ? if (StringUtils.isNotBlank(plugin.getWhereParam())) {
? ? ? ? ? ? ? ? parameterObj.put("where", plugin.getWhereParam());
? ? ? ? ? ? }
? ? ? ? ? ? connectionObj.put("table", plugin.getTables());
? ? ? ? }
? ? ? ? parameterObj.put("splitPk",plugin.getSplitPk());
? ? ? ? connectionObj.put("jdbcUrl", ImmutableList.of(jobDatasource.getJdbcUrl()));

? ? ? ? parameterObj.put("connection", ImmutableList.of(connectionObj));

? ? ? ? readerObj.put("parameter", parameterObj);

? ? ? ? return readerObj;
? ? }

? ? // Hive参数构造,具体实现见HiveReader
? ? @Override
? ? public Map<String, Object> buildHive(DataxHivePojo dataxHivePojo) {
? ? ? ? return null;
? ? }

? ? // Hbase参数构造,具体实现见HbaseReader
? ? @Override
? ? public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) { return null; }

? ? // MongoDB参数构造,具体实现见MongoDBReader
? ? @Override
? ? public Map<String, Object> buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo) {
? ? ? ? return null;
? ? }
2)BaseWriterPlugin
具体代码,类似baseReader,hive,hbase和mongodb参数构造需要在子类实现
@Override
? ? public Map<String, Object> build(DataxRdbmsPojo plugin) {
? ? ? ? Map<String, Object> writerObj = Maps.newLinkedHashMap();
? ? ? ? writerObj.put("name", getName());

? ? ? ? Map<String, Object> parameterObj = Maps.newLinkedHashMap();
// ? ? ? ?parameterObj.put("writeMode", "insert");
? ? ? ? JobDatasource jobDatasource = plugin.getJobDatasource();
? ? ? ? parameterObj.put("username", jobDatasource.getJdbcUsername());
? ? ? ? parameterObj.put("password", jobDatasource.getJdbcPassword());
? ? ? ? parameterObj.put("column", plugin.getRdbmsColumns());
? ? ? ? parameterObj.put("preSql", splitSql(plugin.getPreSql()));
? ? ? ? parameterObj.put("postSql", splitSql(plugin.getPostSql()));

? ? ? ? Map<String, Object> connectionObj = Maps.newLinkedHashMap();
? ? ? ? connectionObj.put("table", plugin.getTables());
? ? ? ? connectionObj.put("jdbcUrl", jobDatasource.getJdbcUrl());

? ? ? ? parameterObj.put("connection", ImmutableList.of(connectionObj));
? ? ? ? writerObj.put("parameter", parameterObj);

? ? ? ? return writerObj;
? ? }

? ? private String[] splitSql(String sql) {
? ? ? ? String[] sqlArr = null;
? ? ? ? if (StringUtils.isNotBlank(sql)) {
? ? ? ? ? ? Pattern p = Pattern.compile("\r\n|\r|\n|\n\r");
? ? ? ? ? ? Matcher m = p.matcher(sql);
? ? ? ? ? ? String sqlStr = m.replaceAll(Constants.STRING_BLANK);
? ? ? ? ? ? sqlArr = sqlStr.split(Constants.SPLIT_COLON);
? ? ? ? }
? ? ? ? return sqlArr;
? ? }

? ? @Override
? ? public Map<String, Object> buildHive(DataxHivePojo dataxHivePojo) {
? ? ? ? return null;
? ? }


? ? @Override
? ? public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) {
? ? ? ? return null;
? ? }

? ? @Override
? ? public Map<String, Object> buildMongoDB(DataxMongoDBPojo plugin) {
? ? ? ? return null;
? ? }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:02:54  更:2022-05-21 19:04:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:30:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码