使用sharding时,有些语句可能被分发到多个数据库节点执行,之后将这些结果汇总加工返回到客户端。归并引擎的作用便是完成对结果的汇总加工,也叫作结果归并。 sharding结果归并从功能上分为遍历、排序、分组、分页和聚合5种类型,从结构划分可分为流式归并、内存归并和装饰者归并,流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。
流式归并是指每一次从结果集中获取到的数据,都能够通过逐条获取的方式返回正确的单条数据,它与数据库原生的返回结果集的方式最为契合。遍历、排序以及流式分组都属于流式归并的一种。 内存归并则是需要将结果集的所有数据都遍历并存储在内存中,再通过统一的分组、排序以及聚合等计算之后,再将其封装成为逐条访问的数据结果集返回。 装饰者归并是对所有的结果集归并进行统一的功能增强,目前装饰者归并有分页归并和聚合归并这2种类型。
本文接下来介绍select语句的结果归并,对于其他语句不再本文介绍范围内。
1、归并引擎
归并引擎根据select语句特点,查找合适的归并处理策略,将数据库执行结果封装到MergedResult对象中。sharding对MergedResult使用ShardingResultSet封装,ShardingResultSet实现了ResultSet,应用程序获得便是ShardingResultSet对象,ShardingResultSet将对其方法的调用都委托给了MergedResult。所以应用程序中调用ShardingResultSet,其实就是调用MergedResult。 从上面的介绍中,可以看到MergedResult的作用:持有多个数据库的结果集对象(ResultSet),当应用程序获取数据库结果时,可以在多个结果集之间自动切换,使得应用程序就像使用一个结果集对象而不是多个。 sharding提供了12个功能各不相同的MergedResult实现类,归并引擎的作用便是查找合适的MergedResult实现类,并创建MergedResult对象,将该对象返回给上层调用。 sharding提供了两种归并引擎实现类:ShardingResultMergerEngine和EncryptResultDecoratorEngine,后者在加密场景中使用,一般情况下使用的是ShardingResultMergerEngine。
2、MergedResult
先来看一下MergedResult接口:
public interface MergedResult {
boolean next() throws SQLException;
Object getValue(int columnIndex, Class<?> type) throws SQLException;
Object getCalendarValue(int columnIndex, Class<?> type, Calendar calendar) throws SQLException;
InputStream getInputStream(int columnIndex, String type) throws SQLException;
boolean wasNull() throws SQLException;
}
MergedResult提供了next方法和getValue方法,next方法用于判断是否已经完成对结果集的遍历,getValue方法可以获得数据库的执行结果。 MergedResult将结果集封装,调用方调用next方法和getValue方法便可以完成对所有结果的遍历,大大简化了调用方的操作。 sharding提供了12种MergedResult实现类:
实现类 | 作用 |
---|
LimitDecoratorMergedResult | mysql使用,用于分页查询 | IteratorStreamMergedResult | 对结果集执行简单遍历 | GroupByStreamMergedResult | 用于分组查询 | OrderByStreamMergedResult | 排序 | GroupByMemoryMergedResult | 分组,与上面分组不同的是,该类需要在内存中完成分组 | SingleLocalDataMergedResult | DAL语句使用,DAL表示数据库管理类的语句,比如show create table语句 | MultipleLocalDataMergedResult | 暂没有使用场景 | LogicTablesMergedResult | DAL语句使用 | ShowCreateTableMergedResult | DAL语句使用 | TransparentMergedResult | 用于update/insert/delete等语句 | TopAndRowNumberDecoratorMergedResult | SQL Server使用,用于分页查询 | RowNumberDecoratorMergedResult | Oracle使用,用于分页查询 |
下面对部分MergedResult实现类详细介绍。
2.1、IteratorStreamMergedResult
该实现类是遍历归并,是最简单的归并方式。sharding将多个结果集对象组织成一个链表,顺次遍历链表中的每个结果集。下面看一下它的源码。
public IteratorStreamMergedResult(final List<QueryResult> queryResults) {
this.queryResults = queryResults.iterator();
setCurrentQueryResult(this.queryResults.next());
}
@Override
public boolean next() throws SQLException {
if (getCurrentQueryResult().next()) {
return true;
}
if (!queryResults.hasNext()) {
return false;
}
setCurrentQueryResult(queryResults.next());
boolean hasNext = getCurrentQueryResult().next();
if (hasNext) {
return true;
}
while (!hasNext && queryResults.hasNext()) {
setCurrentQueryResult(queryResults.next());
hasNext = getCurrentQueryResult().next();
}
return hasNext;
}
2.2、OrderByStreamMergedResult
当SQL语句中有order by子句,且需要归并多个数据库节点返回的结果集时,sharding使用该类处理结果集。 因为每个数据库节点执行的SQL都有order by子句,都是已经排序过的数据,那么将每个结果集的当前数据值进行比较(通过实现Java的Comparable接口完成),并将其放入优先级队列。 每次获取下一条数据时,只需将队列顶端结果集的游标下移,并根据新游标重新进入优先级排序队列找到自己的位置即可。 该类使用的是流式归并的方式,每次next仅获取唯一正确的一条数据,极大的节省了内存的消耗。 下面我们看一下源码:
public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
this.orderByItems = selectStatementContext.getOrderByContext().getItems();
this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
for (QueryResult each : queryResults) {
OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
@Override
public boolean next() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
return false;
}
if (isFirstNext) {
isFirstNext = false;
return true;
}
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
if (firstOrderByValue.next()) {
orderByValuesQueue.offer(firstOrderByValue);
}
if (orderByValuesQueue.isEmpty()) {
return false;
}
setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
return true;
}
优先级队列里面的排序方式是:
public int compareTo(final OrderByValue o) {
int i = 0;
for (OrderByItem each : orderByItems) {
int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
if (0 != result) {
return result;
}
i++;
}
return 0;
}
2.3、GroupByStreamMergedResult
处理group by子句,sharding提供了两种处理方式,一种是流式处理,一种是内存中再分组加工处理。两者的区别是,如果order by子句的字段与group by子句的字段一样,则使用流水处理,如果不一样,则使用内存再加工处理。本小节先介绍流式处理,下一小节介绍内存处理方式。 GroupByStreamMergedResult是OrderByStreamMergedResult的子类,因此GroupByStreamMergedResult使用优先级队列对每个结果集排序。每次调用next方法时,从优先级队列里面获取下一个结果作为当前结果,然后再从优先级队列里面获得下一个结果,将该结果与第一个结果比较,如果相同,则两个结果合并然后遍历下一个,如果不同,则next方法返回。 GroupByStreamMergedResult对结果合并时,也会对sum、max、min等聚合方法进行处理。
2.4、GroupByMemoryMergedResult
因为排序字段与分组字段不同,无法使用流式方式处理结果集。GroupByMemoryMergedResult的处理方式是遍历每条数据库执行结果记录,将它们放入到一个HashMap对象中,该HashMap对象的key是group by字段的值,value是对应的数据库执行结果记录,不过如果有多个记录具有相同的group by字段值,则value是一个聚合后的记录。 遍历完所有的结果后,将HashMap的value转换为一个List对象,然后对List按照order by子句进行排序。这样每次调用next方法获得下一个记录时,就从List中返回。
2.5、LimitDecoratorMergedResult
LimitDecoratorMergedResult是一个装饰器,它会对其他的MergedResult对象进行装饰。
public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
super(mergedResult);
this.pagination = pagination;
skipAll = skipOffset();
}
private boolean skipOffset() throws SQLException {
for (int i = 0; i < pagination.getActualOffset(); i++) {
if (!getMergedResult().next()) {
return true;
}
}
rowNumber = 0;
return false;
}
@Override
public boolean next() throws SQLException {
if (skipAll) {
return false;
}
if (!pagination.getActualRowCount().isPresent()) {
return getMergedResult().next();
}
return ++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next();
}
3、引用
https://shardingsphere.apache.org/document/legacy/4.x/document/cn/features/sharding/principle/merge/
|