一、读写(reader和writer)组件共祖先:BaseDataxPlugin reader和writer父类声明如下: public abstract class BaseWriterPlugin extends BaseDataxPlugin public abstract class BaseReaderPlugin extends BaseDataxPlugin
BaseReaderPlugin又是其他具体reader类的基类 BaseWriterPlugin又是其他具体writer类的基类
关系如下: BaseDataxPlugin ? ? BaseReaderPlugin ? ? ? ? MysqlReader ? ? ? ? OracleReader ? ? ? ? 等等 ? ? BaseWriterPlugin ? ? ? ? HiveWriter ? ? ? ? MysqlWriter 二、 基类分析,即parameter参数分析 { ? "job": { ? ? "content": [ ? ? ? { ? ? ? ? "reader": { ? ? ? ? ? "name": "mysqlreader", ? ? ? ? ? "parameter": { ? ? ? ? ? ? "column": ["id_", "topic_id_", "speak_id_", "file_id_"], ? ? ? ? ? ? "connection": [ ? ? ? ? ? ? ? { ? ? ? ? ? ? ? ? "jdbcUrl": ["jdbc:mysql://*.*.*.*:3306/a?useUnicode=true&characterEncoding=UTF-8"], ? ? ? ? ? ? ? ? "table": ["a"] ? ? ? ? ? ? ? } ? ? ? ? ? ? ], ? ? ? ? ? ? "password": "*****", ? ? ? ? ? ? "username": "a", ? ? ? ? ? ? "where": "", ? ? ? ? ? ? "session": ["set names utf8mb4"] ? ? ? ? ? } ? ? ? ? }, ? ? ? ? "writer": { ? ? ? ? ? "name": "mysqlwriter", ? ? ? ? ? "parameter": { ? ? ? ? ? ? "column": ["id_", "topic_id_", "speak_id_", "file_id_"], ? ? ? ? ? ? "connection": [ ? ? ? ? ? ? ? { ? ? ? ? ? ? ? ? "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/b?useUnicode=true&characterEncoding=UTF-8", ? ? ? ? ? ? ? ? "table": ["b"] ? ? ? ? ? ? ? } ? ? ? ? ? ? ], ? ? ? ? ? ? "password": "****", ? ? ? ? ? ? "username": "b", ? ? ? ? ? ? "where": "", ? ? ? ? ? ? "session": ["set names utf8mb4"] ? ? ? ? ? } ? ? ? ? } ? ? ? } ? ? ], ? ? "setting": { ? ? ? "speed": { ? ? ? ? "channel": "5" ?} ? ? } ? } } 1)BaseReaderPlugin ? ? //关系型数据库参数构造 ? ? @Override ? ? public Map<String, Object> build(DataxRdbmsPojo plugin) { ? ? ? ? //构建 ? ? ? ? Map<String, Object> readerObj = Maps.newLinkedHashMap(); ? ? ? ? readerObj.put("name", getName()); ? ? ? ? Map<String, Object> parameterObj = Maps.newLinkedHashMap(); ? ? ? ? Map<String, Object> connectionObj = Maps.newLinkedHashMap();
? ? ? ? JobDatasource jobDatasource = plugin.getJobDatasource(); ? ? ? ? parameterObj.put("username", jobDatasource.getJdbcUsername()); ? ? ? ? parameterObj.put("password", jobDatasource.getJdbcPassword());
? ? ? ? //判断是否是 querySql ,说明querySql优先级比column配置高 ? ? ? ? if (StrUtil.isNotBlank(plugin.getQuerySql())) { ? ? ? ? ? ? connectionObj.put("querySql", ImmutableList.of(plugin.getQuerySql())); ? ? ? ? } else { ? ? ? ? ? ? parameterObj.put("column", plugin.getRdbmsColumns()); ? ? ? ? ? ? //判断是否有where ? ? ? ? ? ? if (StringUtils.isNotBlank(plugin.getWhereParam())) { ? ? ? ? ? ? ? ? parameterObj.put("where", plugin.getWhereParam()); ? ? ? ? ? ? } ? ? ? ? ? ? connectionObj.put("table", plugin.getTables()); ? ? ? ? } ? ? ? ? parameterObj.put("splitPk",plugin.getSplitPk()); ? ? ? ? connectionObj.put("jdbcUrl", ImmutableList.of(jobDatasource.getJdbcUrl()));
? ? ? ? parameterObj.put("connection", ImmutableList.of(connectionObj));
? ? ? ? readerObj.put("parameter", parameterObj);
? ? ? ? return readerObj; ? ? }
? ? // Hive参数构造,具体实现见HiveReader ? ? @Override ? ? public Map<String, Object> buildHive(DataxHivePojo dataxHivePojo) { ? ? ? ? return null; ? ? }
? ? // Hbase参数构造,具体实现见HbaseReader ? ? @Override ? ? public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) { return null; }
? ? // MongoDB参数构造,具体实现见MongoDBReader ? ? @Override ? ? public Map<String, Object> buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo) { ? ? ? ? return null; ? ? } 2)BaseWriterPlugin 具体代码,类似baseReader,hive,hbase和mongodb参数构造需要在子类实现 @Override ? ? public Map<String, Object> build(DataxRdbmsPojo plugin) { ? ? ? ? Map<String, Object> writerObj = Maps.newLinkedHashMap(); ? ? ? ? writerObj.put("name", getName());
? ? ? ? Map<String, Object> parameterObj = Maps.newLinkedHashMap(); // ? ? ? ?parameterObj.put("writeMode", "insert"); ? ? ? ? JobDatasource jobDatasource = plugin.getJobDatasource(); ? ? ? ? parameterObj.put("username", jobDatasource.getJdbcUsername()); ? ? ? ? parameterObj.put("password", jobDatasource.getJdbcPassword()); ? ? ? ? parameterObj.put("column", plugin.getRdbmsColumns()); ? ? ? ? parameterObj.put("preSql", splitSql(plugin.getPreSql())); ? ? ? ? parameterObj.put("postSql", splitSql(plugin.getPostSql()));
? ? ? ? Map<String, Object> connectionObj = Maps.newLinkedHashMap(); ? ? ? ? connectionObj.put("table", plugin.getTables()); ? ? ? ? connectionObj.put("jdbcUrl", jobDatasource.getJdbcUrl());
? ? ? ? parameterObj.put("connection", ImmutableList.of(connectionObj)); ? ? ? ? writerObj.put("parameter", parameterObj);
? ? ? ? return writerObj; ? ? }
? ? private String[] splitSql(String sql) { ? ? ? ? String[] sqlArr = null; ? ? ? ? if (StringUtils.isNotBlank(sql)) { ? ? ? ? ? ? Pattern p = Pattern.compile("\r\n|\r|\n|\n\r"); ? ? ? ? ? ? Matcher m = p.matcher(sql); ? ? ? ? ? ? String sqlStr = m.replaceAll(Constants.STRING_BLANK); ? ? ? ? ? ? sqlArr = sqlStr.split(Constants.SPLIT_COLON); ? ? ? ? } ? ? ? ? return sqlArr; ? ? }
? ? @Override ? ? public Map<String, Object> buildHive(DataxHivePojo dataxHivePojo) { ? ? ? ? return null; ? ? }
? ? @Override ? ? public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) { ? ? ? ? return null; ? ? }
? ? @Override ? ? public Map<String, Object> buildMongoDB(DataxMongoDBPojo plugin) { ? ? ? ? return null; ? ? }
|