IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Oracle到PG的数据迁移 -> 正文阅读

[大数据]Oracle到PG的数据迁移

Oracle和PG的数据类型不尽相同,在数据迁移时需要考虑类型转换,以下是我在项目中遇到过的类型转换。

From OracleTo PG
VARCHAR2varchar
NUMBERnumeric
CLOBtext
BLOBbytea

下面是数据迁移的核心代码:


public class DataCopyService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataCopyService.class);
    private final DAO oracleDao;
    private final DAO pgDao;

    public DataCopyService(DAO oracleDao, DAO pgDao) {
        this.oracleDao = oracleDao;
        this.pgDao = pgDao;
    }

    public boolean verify(String table) throws SQLException {
        String sql = "select count(1) as row_count from " + table;
        int rowCountOracle = oracleDao.queryForObject(sql,
                ps -> {},
                rs -> rs.getInt("row_count"));
        int rowCountPg = pgDao.queryForObject(sql,
                ps -> {},
                rs -> rs.getInt("row_count"));
        LOGGER.info("table={}, rowCountInOracle={}, rowCountInPg={}, isRowCountSame={}",
                table, rowCountOracle, rowCountPg, rowCountOracle == rowCountPg);
        return rowCountOracle == rowCountPg;
    }

    public boolean copy(String table) throws SQLException {
        long start = System.currentTimeMillis();
        List<Column> oracleColumns = getOracleColumns(table);
        List<Column> pgColumns = getPgColumns(table);
        String selectSql = constructSelectSql(table, oracleColumns);
        String insertSql = constructInsertSql(table, pgColumns);
        LOGGER.info("Going to run select sql: {}", selectSql);
        oracleDao.query(selectSql, DAO.ProcessPreparedStatement.noop(), rs -> {
            pgDao.modify(insertSql, ps -> {
                try {
                    for (int i = 1; i <= oracleColumns.size(); i++) {
                        Column column = oracleColumns.get(i - 1);
                        if (column.getDataType().contains("NUMBER")) {
                            String value = rs.getString(column.getColumnName());
                            if (rs.wasNull()) {
                                ps.setNull(i, Types.NUMERIC);
                            } else if (value.contains(".")) {
                                ps.setDouble(i, Double.parseDouble(value));
                            } else {
                                ps.setLong(i, Long.parseLong(value));
                            }
                        } else if (column.getDataType().contains("VARCHAR")) {
                            ps.setString(i, rs.getString(column.getColumnName()));
                        } else if (column.getDataType().contains("TIMESTAMP")) {
                            ps.setTimestamp(i, rs.getTimestamp(column.getColumnName()));
                        } else if (column.getDataType().contains("DATE")) {
                            ps.setDate(i, rs.getDate(column.getColumnName()));
                        } else if (column.getDataType().contains("BLOB")) {
                            if (null != rs.getBlob(column.getColumnName())) {
                                ps.setBinaryStream(i, rs.getBlob(column.getColumnName()).getBinaryStream(), rs.getBlob(column.getColumnName()).length());
                            } else {
                                ps.setNull(i, Types.BINARY);
                            }
                        } else if (column.getDataType().contains("CLOB")) {
                            String data;
                            if (null != rs.getClob(column.getColumnName())) {
                                Clob clob = rs.getClob(column.getColumnName());
                                Reader r = clob.getCharacterStream();
                                StringBuffer buffer = new StringBuffer();
                                int ch;
                                while (true) {
                                    if (!((ch = r.read()) != -1)) {
                                        break;
                                    }
                                    buffer.append("" + (char) ch);
                                }
                                data = buffer.toString();
                            } else {
                                data = "";
                            }
                            ps.setString(i, data);
                        } else {
                            ps.setString(i, rs.getString(column.getColumnName()));
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            return null;
        });
        LOGGER.info("Complete data copy, table={}, timeCost={}ms", table, System.currentTimeMillis() - start);
        return true;
    }

    private String constructSelectSql(String table, List<Column> columns) {
        return String.format("select %s from %s", columns.stream().map(Column::getColumnName).collect(Collectors.joining(",")), table);
    }

    private String constructInsertSql(String table, List<Column> columns) {
        String columnNames = columns.stream().map(Column::getColumnName).collect(Collectors.joining(","));
        String placeHolders = columns.stream().map(it -> "?").collect(Collectors.joining(","));
        return String.format("insert into %s (%s) values (%s)", table, columnNames, placeHolders);
    }

    private List<Column> getOracleColumns(String tableName) throws SQLException {
        String sql = "select * from ALL_TAB_COLUMNS where TABLE_NAME=? order by COLUMN_ID";
        return oracleDao.query(sql,
                ps -> ps.setString(1, tableName.toUpperCase()),
                rs -> Column.builder()
                        .columnName(rs.getString("COLUMN_NAME").toUpperCase())
                        .dataType(rs.getString("DATA_TYPE"))
                        .nullable("Y".equalsIgnoreCase(rs.getString("NULLABLE")))
                        .columnId(rs.getInt("COLUMN_ID"))
                        .build());
    }

    private List<Column> getPgColumns(String tableName) throws SQLException {
        String sql = "select * from information_schema.columns where table_name=? order by ordinal_position";
        return pgDao.query(sql,
                ps -> ps.setString(1, tableName.toLowerCase()),
                rs -> Column.builder()
                        .columnName(rs.getString("column_name").toUpperCase())
                        .dataType(rs.getString("data_type"))
                        .nullable("YES".equalsIgnoreCase(rs.getString("is_nullable")))
                        .columnId(rs.getInt("ordinal_position"))
                        .build());
    }

}

Demo代码在Github?https://github.com/bin9wei/oracle-to-pg-data-copy?欢迎试玩!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-04 11:06:05  更:2022-02-04 11:07:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:07:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码