IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> shardingJDBC-归并引擎源码解析 -> 正文阅读

[大数据]shardingJDBC-归并引擎源码解析

使用sharding时,有些语句可能被分发到多个数据库节点执行,之后将这些结果汇总加工返回到客户端。归并引擎的作用便是完成对结果的汇总加工,也叫作结果归并。
sharding结果归并从功能上分为遍历、排序、分组、分页和聚合5种类型,从结构划分可分为流式归并、内存归并和装饰者归并,流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。

流式归并是指每一次从结果集中获取到的数据,都能够通过逐条获取的方式返回正确的单条数据,它与数据库原生的返回结果集的方式最为契合。遍历、排序以及流式分组都属于流式归并的一种。
内存归并则是需要将结果集的所有数据都遍历并存储在内存中,再通过统一的分组、排序以及聚合等计算之后,再将其封装成为逐条访问的数据结果集返回。
装饰者归并是对所有的结果集归并进行统一的功能增强,目前装饰者归并有分页归并和聚合归并这2种类型。

本文接下来介绍select语句的结果归并,对于其他语句不再本文介绍范围内。

1、归并引擎

归并引擎根据select语句特点,查找合适的归并处理策略,将数据库执行结果封装到MergedResult对象中。sharding对MergedResult使用ShardingResultSet封装,ShardingResultSet实现了ResultSet,应用程序获得便是ShardingResultSet对象,ShardingResultSet将对其方法的调用都委托给了MergedResult。所以应用程序中调用ShardingResultSet,其实就是调用MergedResult。
从上面的介绍中,可以看到MergedResult的作用:持有多个数据库的结果集对象(ResultSet),当应用程序获取数据库结果时,可以在多个结果集之间自动切换,使得应用程序就像使用一个结果集对象而不是多个。
sharding提供了12个功能各不相同的MergedResult实现类,归并引擎的作用便是查找合适的MergedResult实现类,并创建MergedResult对象,将该对象返回给上层调用。
sharding提供了两种归并引擎实现类:ShardingResultMergerEngine和EncryptResultDecoratorEngine,后者在加密场景中使用,一般情况下使用的是ShardingResultMergerEngine。

2、MergedResult

先来看一下MergedResult接口:

public interface MergedResult {
    
    /**
     * Iterate next data.
     */
    boolean next() throws SQLException;
    
    /**
     * Get data value.
     *
     * @param columnIndex column index
     * @param type class type of data value
     * @return data value
     */
    Object getValue(int columnIndex, Class<?> type) throws SQLException;
    
    /**
     * Get calendar value.
     *
     * @param columnIndex column index
     * @param type class type of data value
     * @param calendar calendar
     * @return calendar value
     */
    Object getCalendarValue(int columnIndex, Class<?> type, Calendar calendar) throws SQLException;
    
    /**
     * Get InputStream.
     *
     * @param columnIndex column index
     * @param type class type of data value
     */
    InputStream getInputStream(int columnIndex, String type) throws SQLException;
    
    /**
     * Judge ResultSet is null or not.
     * 
     * @return ResultSet is null or not
     */
    boolean wasNull() throws SQLException;
}

MergedResult提供了next方法和getValue方法,next方法用于判断是否已经完成对结果集的遍历,getValue方法可以获得数据库的执行结果。
MergedResult将结果集封装,调用方调用next方法和getValue方法便可以完成对所有结果的遍历,大大简化了调用方的操作。
sharding提供了12种MergedResult实现类:

实现类作用
LimitDecoratorMergedResultmysql使用,用于分页查询
IteratorStreamMergedResult对结果集执行简单遍历
GroupByStreamMergedResult用于分组查询
OrderByStreamMergedResult排序
GroupByMemoryMergedResult分组,与上面分组不同的是,该类需要在内存中完成分组
SingleLocalDataMergedResultDAL语句使用,DAL表示数据库管理类的语句,比如show create table语句
MultipleLocalDataMergedResult暂没有使用场景
LogicTablesMergedResultDAL语句使用
ShowCreateTableMergedResultDAL语句使用
TransparentMergedResult用于update/insert/delete等语句
TopAndRowNumberDecoratorMergedResultSQL Server使用,用于分页查询
RowNumberDecoratorMergedResultOracle使用,用于分页查询

下面对部分MergedResult实现类详细介绍。

2.1、IteratorStreamMergedResult

该实现类是遍历归并,是最简单的归并方式。sharding将多个结果集对象组织成一个链表,顺次遍历链表中的每个结果集。下面看一下它的源码。

	//入参queryResults是结果集对象组成的链表
    public IteratorStreamMergedResult(final List<QueryResult> queryResults) {
        this.queryResults = queryResults.iterator();
        setCurrentQueryResult(this.queryResults.next());
    }
    //next方法遍历完当前的结果集对象,便切换到链表中下一个结果集对象
    @Override
    public boolean next() throws SQLException {
        if (getCurrentQueryResult().next()) {
            return true;
        }
        if (!queryResults.hasNext()) {
            return false;
        }
        //设置当前正在访问的结果集对象
        setCurrentQueryResult(queryResults.next());
        boolean hasNext = getCurrentQueryResult().next();
        if (hasNext) {
            return true;
        }
        while (!hasNext && queryResults.hasNext()) {
            setCurrentQueryResult(queryResults.next());
            hasNext = getCurrentQueryResult().next();
        }
        return hasNext;
    }

2.2、OrderByStreamMergedResult

当SQL语句中有order by子句,且需要归并多个数据库节点返回的结果集时,sharding使用该类处理结果集。
因为每个数据库节点执行的SQL都有order by子句,都是已经排序过的数据,那么将每个结果集的当前数据值进行比较(通过实现Java的Comparable接口完成),并将其放入优先级队列。 每次获取下一条数据时,只需将队列顶端结果集的游标下移,并根据新游标重新进入优先级排序队列找到自己的位置即可。
该类使用的是流式归并的方式,每次next仅获取唯一正确的一条数据,极大的节省了内存的消耗。
下面我们看一下源码:

	//queryResults:多个结果集对象组成的链表
    public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
    	//获得排序字段,也就是order by子句里面的字段名
        this.orderByItems = selectStatementContext.getOrderByContext().getItems();
        //orderByValuesQueue就是上文提到的优先级队列,结果集放入该队列中
        this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
        orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
        isFirstNext = true;
    }
    
    private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
    	//获得各个结果集对象的第一个结果,并将它们放入到优先级队列中
        for (QueryResult each : queryResults) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
    }
    
    @Override
    public boolean next() throws SQLException {
        if (orderByValuesQueue.isEmpty()) {
            return false;
        }
        if (isFirstNext) {
            isFirstNext = false;
            return true;
        }
        OrderByValue firstOrderByValue = orderByValuesQueue.poll();
        if (firstOrderByValue.next()) {
            orderByValuesQueue.offer(firstOrderByValue);
        }
        if (orderByValuesQueue.isEmpty()) {
            return false;
        }
        setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
        return true;
    }

优先级队列里面的排序方式是:

    public int compareTo(final OrderByValue o) {
        int i = 0;
        //遍历各个排序字段
        for (OrderByItem each : orderByItems) {
        	//调用java的Comparable接口完成排序,也对排序方向进行了判断
            int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
                each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
            if (0 != result) {
                return result;
            }
            i++;
        }
        return 0;
    }

2.3、GroupByStreamMergedResult

处理group by子句,sharding提供了两种处理方式,一种是流式处理,一种是内存中再分组加工处理。两者的区别是,如果order by子句的字段与group by子句的字段一样,则使用流水处理,如果不一样,则使用内存再加工处理。本小节先介绍流式处理,下一小节介绍内存处理方式。
GroupByStreamMergedResult是OrderByStreamMergedResult的子类,因此GroupByStreamMergedResult使用优先级队列对每个结果集排序。每次调用next方法时,从优先级队列里面获取下一个结果作为当前结果,然后再从优先级队列里面获得下一个结果,将该结果与第一个结果比较,如果相同,则两个结果合并然后遍历下一个,如果不同,则next方法返回。
GroupByStreamMergedResult对结果合并时,也会对sum、max、min等聚合方法进行处理。

2.4、GroupByMemoryMergedResult

因为排序字段与分组字段不同,无法使用流式方式处理结果集。GroupByMemoryMergedResult的处理方式是遍历每条数据库执行结果记录,将它们放入到一个HashMap对象中,该HashMap对象的key是group by字段的值,value是对应的数据库执行结果记录,不过如果有多个记录具有相同的group by字段值,则value是一个聚合后的记录。
遍历完所有的结果后,将HashMap的value转换为一个List对象,然后对List按照order by子句进行排序。这样每次调用next方法获得下一个记录时,就从List中返回。

2.5、LimitDecoratorMergedResult

LimitDecoratorMergedResult是一个装饰器,它会对其他的MergedResult对象进行装饰。

	//mergedResult是被装饰的MergedResult对象
	//pagination用于表示分页,记录了分页数据
    public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
        super(mergedResult);
        this.pagination = pagination;
        skipAll = skipOffset();
    }
    //skipOffset方法调用mergedResult的next方法,跳过指定的分页记录
    private boolean skipOffset() throws SQLException {
        for (int i = 0; i < pagination.getActualOffset(); i++) {
            if (!getMergedResult().next()) {
                return true;
            }
        }
        rowNumber = 0;
        return false;
    }
    
    @Override
    public boolean next() throws SQLException {
        if (skipAll) {
            return false;
        }
        if (!pagination.getActualRowCount().isPresent()) {
            return getMergedResult().next();
        }
        return ++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next();
    }

3、引用

https://shardingsphere.apache.org/document/legacy/4.x/document/cn/features/sharding/principle/merge/

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-06 13:53:32  更:2022-02-06 13:54:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:28:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码