自定义数据源 DataSourcePropertiesSetter
package org.apache.shardingsphere.spring.boot.datasource;
/** Hikari datasource properties setter. */
public final class HikariDataSourcePropertiesSetter implements DataSourcePropertiesSetter {
public void propertiesSet(final Environment environment, final String prefix, final String dataSourceName, final DataSource dataSource) {
Properties properties = new Properties();
String datasourcePropertiesKey = prefix + dataSourceName.trim() + ".data-source-properties";
if (PropertyUtil.containPropertyPrefix(environment, datasourcePropertiesKey)) {
Map datasourceProperties = PropertyUtil.handle(environment, datasourcePropertiesKey, Map.class);
properties.putAll(datasourceProperties);
Method method = dataSource.getClass().getMethod("setDataSourceProperties", Properties.class);
method.invoke(dataSource, properties);
}
}
@Override
public String getType() {
return "com.zaxxer.hikari.HikariDataSource";
}
}
通过实现接口 org.apache.shardingsphere.spring.boot.datasource.DataSourcePropertiesSetter 来反射方式给DataSource注入属性。?
SpringBootConfiguration#getDataSource在创建DataSource时,会根据#getType()指定类型的数据源匹配属性注入文件。
自定义数据源和配置注入器:
package com.noob.shardingJdbc.dataSoure;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.util.PropertyElf;
import java.util.Properties;
public class CustomizeHikariDataSource extends HikariDataSource {
public CustomizeHikariDataSource() { super(); }
public CustomizeHikariDataSource(HikariConfig configuration) { super(configuration); }
/* 解决sharding-jdbc在设置setDataSourceProperties后不生效的问题 */
@Override
public void setDataSourceProperties(Properties dsProperties) {
super.setDataSourceProperties(dsProperties); // 只是设置dataSourceProperties
PropertyElf.setTargetFromProperties(this, this.getDataSourceProperties()); // 还需要真实写入到dataSource的成员属性里
}
}
----
import lombok.SneakyThrows;
import org.apache.shardingsphere.spring.boot.datasource.DataSourcePropertiesSetter;
import org.apache.shardingsphere.spring.boot.datasource.HikariDataSourcePropertiesSetter;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
public final class CustomizeHikariDataSourcePropertiesSetter implements DataSourcePropertiesSetter {
HikariDataSourcePropertiesSetter setter = new HikariDataSourcePropertiesSetter();
@SneakyThrows(ReflectiveOperationException.class)
public void propertiesSet(final Environment environment, final String prefix, final String dataSourceName, final DataSource dataSource) {
setter.propertiesSet(environment, prefix, dataSourceName, dataSource);
}
@Override
public String getType() {
return "com.noob.shardingJdbc.dataSoure.CustomizeHikariDataSource";
}
}
自动配置类?SpringBootConfiguration?
package:??org.apache.shardingsphere.shardingjdbc.spring.boot
根据配置差异会选择实例化不同的DataSource类型:
按分片策略路由。 ShardingRuleCondition -> ?ShardingDataSource ?->?ShardingPreparedStatement ->?ShardingConnection ShardingRuntimeContext -> ShardingRule ->?ShardingRouter#router
区分读写请求,写走主库,读按负载策略LoadBalanceAlgorithm路由从库。 MasterSlaveRuleCondition -> MasterSlaveDataSource ->MasterSlavePreparedStatement? ->?MasterSlaveConnection MasterSlaveRuntimeContext -> MasterSlaveRule ->??MasterSlaveRouter#router ??
org.apache.shardingsphere.core.rule.
ShardingRule
org.apache.shardingsphere.api.config.sharding.
ShardingRuleConfiguration
org.apache.shardingsphere.core.rule.
TableRule
2020-03-15 17:24:46.705 [main] INFO [trace=,span=,parent=] ShardingSphere-SQL - Rule Type: sharding
2020-03-15 17:24:46.705 [main] INFO [trace=,span=,parent=] ShardingSphere-SQL - Logic SQL: select * from limit_use u , repayment_plan t where t.loan_no ='LOAN683009630195941376' and u.loan_no = t.loan_no
2020-03-15 17:24:46.705 [main] INFO [trace=,span=,parent=] ShardingSphere-SQL - SQLStatement: SelectSQLStatementContext(super=CommonSQLStatementContext(sqlStatement=org.apache.shardingsphere.sql.parser.sql.statement.dml.SelectStatement@734c98ff, tablesContext=TablesContext(tables=[Table(name=limit_use, alias=Optional.of(u)), Table(name=repayment_plan, alias=Optional.of(t))], schema=Optional.absent())), projectionsContext=ProjectionsContext(startIndex=7, stopIndex=7, distinctRow=false, projections=[ShorthandProjection(owner=Optional.absent())], columnLabels=[id, flow_no, customer_id, customer_name, certificate_no, loan_no, mobile_phone, bank_account_id, bank_account_no, repay_bank_account_id, repay_bank_account_no, contract_no, use_amount, total_periods, term_type, term, channel, loan_type, loan_type_id, repayment_method, due_day, apply_date, accept_date, settle_date, interest_rate, loan_date, interest_start_date, use_status, purpose, trade_channel, trade_status, trade_no, trade_message, business_error_code, reason, version, merge_loan_flag, merge_loan_account, create_time, update_time, id, loan_no, period, should_repayment_date, should_repayment_principal, should_repayment_interest, should_repayment_penalty, should_repayment_fee, actual_repayment_principal, actual_repayment_interest, actual_repayment_penalty, actual_repayment_fee, interest_deduction_amount, penalty_deduction_amount, remaining_principal, is_overdue, actual_repayment_date, repayment_plan_status, grace_days, version, create_time, update_time]), groupByContext=org.apache.shardingsphere.sql.parser.relation.segment.select.groupby.GroupByContext@7a203af7, orderByContext=org.apache.shardingsphere.sql.parser.relation.segment.select.orderby.OrderByContext@360e9b23, paginationContext=org.apache.shardingsphere.sql.parser.relation.segment.select.pagination.PaginationContext@67ee3b1, containsSubquery=false)
2020-03-15 17:24:46.705 [main] INFO [trace=,span=,parent=] ShardingSphere-SQL - Actual SQL: ds0 ::: select * from limit_use_08 u , repayment_plan_08 t where t.loan_no ='LOAN683009630195941376' and u.loan_no = t.loan_no
Query执行过程 ShardingPreparedStatement#executeQuery
下文以4.0.1版本的sharding模式为例 , 4.0.0-RC1 与?4.0.1 版本?在解析sq路由的变动还是挺大的。
从mybatis执行进入ShardingJdbc逻辑:"Executor ->?PreparedStatementHandler? -> PreparedStatement?" 这个流程里拿到的DataSource、Connection、Statement都是ShardingJdbc的: ShardingDataSource -> ShardingConnection -> ShardingPreparedStatement。
#executeQuery的过程:
依据分片规则 和 sql类型 解析确定路由到的具体库&表, 重写sql生成真实的脚本,解析后多个库表会有多个;?(ps.对于其他确定是单播的情况只有1个) 初始化执行器 PreparedStatementExecutor。 先按数据源分组,以下逻辑前提是在同个数据源下 :
按 “单次查询最大连接数”规则 再对sql集合分组,该集合size就是需要从该DataSource里拿到有效空闲的Connection数! (分组规则: 按“单次查询最大连接数”划分区间段,sql集合size落在哪个区间就以该区间的序列作为集合切分的最大数据条数 Lists.partition(sqlUnits, desiredPartitionSize)) 选择Connect模式:内存限制? CONNECTION_STRICTLY / 连接限制?CONNECTION_STRICTLY ? 。每一个真实sql表都有一个执行单元StatementExecuteUnit , 它们的分组隶属于ShardingExecuteGroup,内部共用同一连接! 如果一个连接需要处理多个sql表,则选择Connect模式的“连接限制” ,此时会使用MemoryQueryResult来封装ResultSet, 将数据行一并读入到内存中! 按分组ShardingExecuteGroup执行sql ,同个组内可选 并行parallel 或 串行serial 执行,得到封装sql执行结果集ResultSet的QueryResult。 数据结果集归并引擎MergeEngine 会按sql语句规则选择 流式 | 内存? 的方式来计算整合得到最终数据ShardingResultSet(MergedResult)。
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious(); // 清除上次执行各处理缓存
shard(); // BaseShardingEngine#shard分片解析出路由sqlRouteResult
initPreparedStatementExecutor(); // 依路由结果初始化各个分库表的执行上下文Statement
// 数据结果集QueryResult,需要选择归并方式
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(),
connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());
result = getResultSet(mergeEngine);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
sql路由 PreparedStatementRoutingEngine#route
分片入口:?BaseShardingEngine#shard()
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
// 先 PreparedStatementRoutingEngine#route:解析分库分表规则得到sql路由结果SQLRouteResult
SQLRouteResult result = executeRoute(sql, clonedParameters);
// BaseShardingEngine#rewriteAndConvert: 依据SQLRouteResult改写sql成真实的库表。
result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
if (showSQL) {
boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
SQLLogger.logSQL(sql, showSimple, result.getSqlStatementContext(), result.getRouteUnits());
}
return result;
}
?PreparedStatementRoutingEngine#route
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlStatement) {
sqlStatement = shardingRouter.parse(logicSQL, true);
}
// 先? 分片路由? 再? 主从库路由。
return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
}
SQLParseEngine#parse??解析sql成语法树? ->?ShardingRouter#route() ->?RoutingEngine#route() : 按分库分表规则得到真实的库表
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
// ShardingStatementValidator.validate 对 insert、update 时 禁止对分表字段更新
Optional<ShardingStatementValidator> shardingStatementValidator = ShardingStatementValidatorFactory.newInstance(sqlStatement);
if (shardingStatementValidator.isPresent()) {
shardingStatementValidator.get().validate(shardingRule, sqlStatement, parameters);
}
SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getRelationMetas(), logicSQL, parameters, sqlStatement);
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
? GeneratedKey.getGenerateKey(shardingRule, metaData.getTables(), parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();
// 入参条件
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, generatedKey.orNull(), metaData.getRelationMetas());
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
checkSubqueryShardingValues(sqlStatementContext, shardingConditions);
mergeShardingConditions(shardingConditions);
}
RoutingEngine routingEngine = RoutingEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions);
// 通过条件和分表规则 路由执行
RoutingResult routingResult = routingEngine.route();
if (needMergeShardingValues) {
// 分表子查询时 需要判定RoutingUnits 惟一
Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
}
SQLRouteResult result = new SQLRouteResult(sqlStatementContext, shardingConditions, generatedKey.orNull());
result.setRoutingResult(routingResult);
if (sqlStatementContext instanceof InsertSQLStatementContext) {
setGeneratedValues(result);
}
return result;
}
这里重点关注RoutingEngineFactory#newInstance 依据 分片规则和sql执行上下文SQLStatementContext?等创建的不同场景RoutingEngine
public static RoutingEngine newInstance(final ShardingRule shardingRule,
final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext, final ShardingConditions shardingConditions) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
if (sqlStatement instanceof TCLStatement) {
//授权、角色控制等数据库控制语言: 全库路由, 基于每个DataSourceName构建一个RoutingUnit
return new DatabaseBroadcastRoutingEngine(shardingRule);
}
if (sqlStatement instanceof DDLStatement) {
//数据定义语言:全库表路由
return new TableBroadcastRoutingEngine(shardingRule, metaData.getTables(), sqlStatementContext);
}
if (sqlStatement instanceof DALStatement) {
return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
}
if (sqlStatement instanceof DCLStatement) {
return getDCLRoutingEngine(shardingRule, sqlStatementContext, metaData);
}
if (shardingRule.isAllInDefaultDataSource(tableNames)) {
// 默认的数据库
return new DefaultDatabaseRoutingEngine(shardingRule, tableNames);
}
if (shardingRule.isAllBroadcastTables(tableNames)) {
// UnicastRoutingEngine 代表单播路由,它只需要从任意库中的任意真实表中获取数据即可。
// 例如 DESCRIBE 语句就适合使用 UnicastRoutingEngine,因为每个真实表中的数据描述结构都是相同的。
return sqlStatement instanceof SelectStatement ? new UnicastRoutingEngine(shardingRule, tableNames) : new DatabaseBroadcastRoutingEngine(shardingRule);
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {
return new DefaultDatabaseRoutingEngine(shardingRule, tableNames);
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {
return new UnicastRoutingEngine(shardingRule, tableNames);
}
return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames); // StandardRoutingEngine(单个表或者有绑定关系的多个表) 或 ComplexRoutingEngine?
}
ShardingRule#isAllInDefaultDataSource:对于不分片的表,会判定是否有默认的数据库配置 。
// 有默认数据库 && 没有路由规则 && 不是广播表 && 有表名
public boolean isAllInDefaultDataSource(final Collection<String> logicTableNames) {
if (!hasDefaultDataSourceName()) {
return false;
}
for (String each : logicTableNames) {
if (findTableRule(each).isPresent() || isBroadcastTable(each)) {
return false;
}
}
return !logicTableNames.isEmpty();
}
ShardingDataSourceNames#getDefaultDataSourceName :?只有1个数据库时它就是默认
// 只有1个数据库,则它就是默认;否则取配置的default
public String getDefaultDataSourceName() {
return 1 == dataSourceNames.size() ? dataSourceNames.iterator().next() : shardingRuleConfig.getDefaultDataSourceName();
}
?StandardRoutingEngine#route()? -> #getDataNodes:
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
if (isRoutingByHint(tableRule)) {
// 先判定指定datebase和table都是Hint路由策略!
return routeByHint(tableRule);
}
if (isRoutingByShardingConditions(tableRule)) {
// datebase和table都不是Hint策略
return routeByShardingConditions(tableRule);
}
return routeByMixedConditions(tableRule);
}
StandardRoutingEngine#routeByShardingConditions -> #routeTables :?
根据配置的分片规则获取指定的算法ShardingStrategy来计算出路由的实际的分库及分表序列
分片策略 ShardingStrategy
StandardShardingStrategy#doSharding (单字段): 只取RouteValue 集合中的第一个值作为判定依据 ComplexShardingStrategy#doSharding (混合字段): 当ListRouteValue传入的需要计算分片规则的数据值有N个就会计算N次分表,通过TreeSet去重?
public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {
private final String columnName; // 表字段
private final String tableName; // 表
private final Collection<T> values; // 字段入参值
数据结果集的归并 MergeEngine
(18条消息) 剖析Sharding-Sphere系列——结果归并_ShardingSphere的博客-CSDN博客 https://blog.csdn.net/shardingsphere/article/details/99316821?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_title~default-1-99316821-blog-120070807.pc_relevant_eslanding_v1&spm=1001.2101.3001.4242.2&utm_relevant_index=4
package org.apache.shardingsphere.sharding.merge;
public final class MergeEngineFactory {
/**
* Create merge engine instance.
*
* @param databaseType database type
* @param shardingRule sharding rule 分片规则
* @param routeResult SQL route result sql路由结果
* @param relationMetas relation metas
* @param queryResults query results 查询结果集
* @return merge engine instance
*/
public static MergeEngine newInstance(final DatabaseType databaseType, final ShardingRule shardingRule,
final SQLRouteResult routeResult, final RelationMetas relationMetas, final List<QueryResult> queryResults) {
if (routeResult.getSqlStatementContext() instanceof SelectSQLStatementContext) {
return new DQLMergeEngine(databaseType, (SelectSQLStatementContext) routeResult.getSqlStatementContext(), queryResults);
}
if (routeResult.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
return new DALMergeEngine(shardingRule, queryResults, routeResult.getSqlStatementContext(), relationMetas);
}
return new TransparentMergeEngine(queryResults);
}
这里拿查询语句的 DQLMergeEngine#merge() -> #build来分析。
public MergedResult merge() throws SQLException {
if (1 == queryResults.size()) {
// 如果只有一个归并的结果集,使用了IteratorStreamMergedResult流式归并。
return new IteratorStreamMergedResult(queryResults);
}
Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
selectSQLStatementContext.setIndexes(columnLabelIndexMap);
return decorate(build(columnLabelIndexMap));
}
// 在不同的ordrBy\ groupBy\Distinct 场景下选择哪种MergedResult
private MergedResult build(final Map<String, Integer> columnLabelIndexMap) throws SQLException {
if (isNeedProcessGroupBy()) {
// 优先判定了GroupBy
return getGroupByMergedResult(columnLabelIndexMap);
}
if (isNeedProcessDistinctRow()) {
setGroupByForDistinctRow();
return getGroupByMergedResult(columnLabelIndexMap);
}
if (isNeedProcessOrderBy()) {
// 如果只有orderby 选择StreamMergedResult
return new OrderByStreamMergedResult(queryResults, selectSQLStatementContext.getOrderByContext().getItems());
}
return new IteratorStreamMergedResult(queryResults);
}
private MergedResult getGroupByMergedResult(final Map<String, Integer> columnLabelIndexMap) throws SQLException {
// sql的排序项 order by与分组项 group by 的字段以及排序类型(ASC或DESC)保持一致时选择StreamMergedResult
return selectSQLStatementContext.isSameGroupByAndOrderByItems()
? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectSQLStatementContext)
: new GroupByMemoryMergedResult(queryResults, selectSQLStatementContext);
}
MemoryMergedResult 内存归并 ?需要将结果集的所有数据都遍历并存储在内存中,再通过统一的分组、排序以及聚合等计算整合。
StreamMergedResult 流式归并? 要求sql的排序项 order by与分组项 group by 的字段以及排序类型(ASC或DESC)必须保持一致!? 每一次从结果集中获取到的数据,都能够通过逐条获取 的方式返回正确的单条数据。相当于归并 算法!
在OrderByStreamMergedResult里:
有一个优先级队列‘PriorityQueue orderByValuesQueue’ 用来对每个结果集QueryResult单次下翻<#next()>后返回的单条数据实时排序。 MergedResult#next()? 和 MergedResult#getValue 配合操作QueryResult 完成归并。
public class OrderByStreamMergedResult extends StreamMergedResult {
private final Collection<OrderByItem> orderByItems;
@Getter(AccessLevel.PROTECTED)
private final Queue<OrderByValue> orderByValuesQueue;
@Getter(AccessLevel.PROTECTED)
private boolean isFirstNext;
public OrderByStreamMergedResult(final List<QueryResult> queryResults, final Collection<OrderByItem> orderByItems) throws SQLException {
this.orderByItems = orderByItems;
this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
orderResultSetsToQueue(queryResults);
isFirstNext = true;
}
...
// 下翻
public boolean next() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
return false;
}
if (isFirstNext) {
isFirstNext = false;
return true;
}
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
if (firstOrderByValue.next()) { // 这里最终是QueryResult.next()
orderByValuesQueue.offer(firstOrderByValue);
}
if (orderByValuesQueue.isEmpty()) {
return false;
}
setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
return true;
}
}
数据结果集 QueryResult
ConnectionMode 的两种模式:
内存限制(MEMORY_STRICTLY):?采用流式处理StreamQueryResult,依次操作ResultSet#next() 连接限制(CONNECTION_STRICTLY) : 采用MemoryQueryResult,在实例化阶段就会一次性循环操作ResultSet#next()将数据库返回的所有数据行都读入内存。 针对于每一个数据源下: 当配置的maxConnectionsSizePerQuery(默认1)小于? 解析到真实表个数时 (一个连接处理多个表) 会选择该模式!
它们的区别是:
流式: StreamQueryResult#next() 是直接操作 ResultSet#next() 内存:?MemoryQueryResult在实例化创建时就会提前循环迭代处理ResultSet,缓存所有数据行。当调用#next()方法时使用的是该集合缓存的迭代器#next()
public final class MemoryQueryResult implements QueryResult {
private final ResultSetMetaData resultSetMetaData;
private final Iterator<List<Object>> rows; // 总数据的迭代器
private List<Object> currentRow;
public MemoryQueryResult(final ResultSet resultSet) throws SQLException {
resultSetMetaData = resultSet.getMetaData();
rows = getRows(resultSet); // 实例化时直接迭代处理ResultSet缓存全部数据
}
private Iterator<List<Object>> getRows(final ResultSet resultSet) throws SQLException {
Collection<List<Object>> result = new LinkedList<>();
while (resultSet.next()) {
List<Object> rowData = new ArrayList<>(resultSet.getMetaData().getColumnCount());
for (int columnIndex = 1; columnIndex <= resultSet.getMetaData().getColumnCount(); columnIndex++) {
Object rowValue = getRowValue(resultSet, columnIndex);
rowData.add(resultSet.wasNull() ? null : rowValue);
}
result.add(rowData);
}
return result.iterator();
}
-----
public final class StreamQueryResult implements QueryResult {
private final ResultSetMetaData resultSetMetaData;
private final ResultSet resultSet;
public StreamQueryResult(final ResultSet resultSet) throws SQLException {
resultSetMetaData = resultSet.getMetaData();
this.resultSet = resultSet;
}
@Override
public boolean next() throws SQLException {
return resultSet.next();
}
如何选择QueryResult类型
public final class RouteUnit {
private final String dataSourceName; // 数据源
private final SQLUnit sqlUnit; // 重写后的真实sql和入参关系
}
public final class SQLUnit {
private final String sql;
private final List<Object> parameters;
}
SQLRouteResult里有保存路由的实际库表信息对象RouteUnit的集合。?
RoutingResult:
[RoutingUnit(dataSourceName=ds0, masterSlaveLogicDataSourceName=ds0, tableUnits=[TableUnit(logicTableName=limit_use, actualTableName=limit_use_08), TableUnit(logicTableName=repayment_plan, actualTableName=repayment_plan_08)]),
RoutingUnit(dataSourceName=ds0, masterSlaveLogicDataSourceName=ds0, tableUnits=[TableUnit(logicTableName=limit_use, actualTableName=limit_use_00), TableUnit(logicTableName=repayment_plan, actualTableName=repayment_plan_08)])]
当线程执行到初始化PreparedStatementExecutor过程 : ShardingPreparedStatement#executeQuery() -> [? #initPreparedStatementExecutor -> PreparedStatementExecutor#init -> #obtainExecuteGroups -> SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups? -> #getSQLExecuteGroups ]
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
// 按dataSource库分组
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
}
return result;
}
// 把库表信息RouteUnit集合按数据源DataSourceName分组
private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
for (RouteUnit each : routeUnits) {
if (!result.containsKey(each.getDataSourceName())) {
result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
}
result.get(each.getDataSourceName()).add(each.getSqlUnit());
}
return result;
}
private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
//按maxConnectionsSizePerQuery计算出多少个表分成同组。
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
// 分组最终的个数就是拿Connection的个数
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
// maxConnectionsSizePerQuery(单次查询最大连接数)< sql数量 ,使用CONNECTION_STRICTLY连接限制。
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
// 同一数据源内的多个表的操作可能是会共用一个连接
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection,
final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
for (SQLUnit each : sqlUnitGroup) {
// 分组内每个SQLUnit 都对应一个执行单元StatementExecuteUnit, 但connection是共用的
result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
}
return new ShardingExecuteGroup<>(result);
}
?将sqlRouteResult内的库表路由信息RouteUnit集合按数据源分组。 单个数据源DataSource内:
如果有多个表SQLUnit需要访问,依单次查询最大连接数 maxConnectionsSizePerQuery ,按规则计算得到单个分组的数据量阈值, 给SQLUnit集合按数量切分,同组内的SQLUnit会共用同一个连接。有N个组就从该DataSource里拿N个Connection ;? maxConnectionsSizePerQuery < SQLUnit总量 (也就是说: 一个连接处理多个表 ),则选择?ConnectionMode.CONNECTION_STRICTLY? 连接限制, 否则是? ConnectionMode.MEMORY_STRICTLY? 内存限制。 每个SQLUnit 都对应一个执行单元StatementExecuteUnit, 它们隶属于同一组ShardingExecuteGroup,connection共用。
// ShardingPropertiesConstant:
MAX_CONNECTIONS_SIZE_PER_QUERY("max.connections.size.per.query", String.valueOf(1), int.class),
当线程在 ShardingPreparedStatement#executeQuery() 里,?ShardingPreparedStatement#initPreparedStatementExecutor() 执行器初始完成 后的逻辑
[ PreparedStatementExecutor#executeQuery()? -> #executeCallback ->ShardingExecuteEngine#groupExecute ] :将前述过程里分组后的多个ShardingExecuteGroup依次执行处理。ShardingExecuteGroup内部可选 并行 或 串行 执行 <ShardingConnection#isHoldTransaction() 持有事务选择serial> 。
SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 PreparedStatementExecutor#getQueryResult 方法。
// PreparedStatementExecutor执行
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}