浅析spring 中的多数据源解决方案AbstractRoutingDataSource 的使用
AbstractRoutingDataSource 是spring提供的一种多数据源解决方案,其继承关系如下图所示。
上图中没有将一些属性展示出来,这里挑几个重点的属性简单分析一下。
private Map<Object, Object> targetDataSources;
private Object defaultTargetDataSource;
private boolean lenientFallback = true;
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
private Map<Object, DataSource> resolvedDataSources;
private DataSource resolvedDefaultDataSource;
targetDataSources 就是需要设置的多数据源,可理解为从数据源,对应的defaultTargetDataSource 可理解为主数据源,这两个属性均可通过对应的setter 进行设置。lenientFallback 直接翻译有点怪怪的,简单理解,当通过路由查找键找不到对应的数据源时,是否使用默认的数据源,默认是true。至于后面两个resolvedXXX ,其实对应的就是targetDataSources 和defaultTargetDataSource ,具体的初始化过程见afterPropertiesSet() ,因为在通过setter 设置数据源的时候,值类型不一定是DataSource ,可能为字符串,这时候就需要dataSourceLookup 将其转换为DataSource ,dataSourceLookup 一般情况下不需要我们自定义,直接使用默认的就行。
当需要操作数据库的时候,AbstractRoutingDataSource 通过getConnection() 方法获取当前需要操作的数据源的连接
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
具体要使用哪个数据源,则由determineTargetDataSource() 来决定
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
其中,determineCurrentLookupKey() 是个抽象方法
protected abstract Object determineCurrentLookupKey();
看到这里,大致的使用方式已经基本上很清晰了,接下来就来实现它
首先,自定义配置属性,用于配置多数据源
@ConfigurationProperties(prefix = "dynamic")
public class DynamicDataSourceProperties {
private Map<String, DruidDataSource> datasource = new LinkedHashMap<>();
public Map<String, DruidDataSource> getDatasource() {
return datasource;
}
public void setDatasource(Map<String, DruidDataSource> datasource) {
this.datasource = datasource;
}
}
接下来创建配置类来对数据源进行配置
@EnableTransactionManagement
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
@Configuration
public class DynamicDataSourceConfig {
private final DynamicDataSourceProperties dynamicDataSourceProperties;
public DynamicDataSourceConfig(DynamicDataSourceProperties dynamicDataSourceProperties) {
this.dynamicDataSourceProperties = dynamicDataSourceProperties;
}
@Bean
@ConfigurationProperties("spring.datasource.druid")
public DataSource dataSource() {
return DruidDataSourceBuilder.create().build();
}
@Primary
@Bean
public DynamicDataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(getDynamicDataSource());
dynamicDataSource.setDefaultTargetDataSource(dataSource());
return dynamicDataSource;
}
private Map<Object, Object> getDynamicDataSource() {
Map<String, DruidDataSource> dataSourcePropertiesMap = dynamicDataSourceProperties.getDatasource();
return new HashMap<>(dataSourcePropertiesMap);
}
}
同时,因为使用了自定义数据源,所以需要去掉数据源的自动配置,在主启动类上的@SpringBootApplication 注解上通过exclude 属性将DataSourceAutoConfiguration 排除,如下
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class SpringBootDynamicDatasourceApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootDynamicDatasourceApplication.class, args);
}
}
回到配置类DynamicDataSourceConfig ,其中的DynamicDataSource 定义如下,其继承自AbstractRoutingDataSource 并实现了determineCurrentLookupKey() 方法来决定选用哪个查找键,此方法内则调用的是DynamicContextHolder.peek() 来获取查找键。
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DynamicContextHolder.peek();
}
}
DynamicContextHolder 主要通过ThreadLocal 来保存当前线程的数据源,并且使用双端队列Deque 来保存当前事务中涉及的到数据源,毕竟有些事务中涉及到多个数据源,比如最外层是数据源ds0,中层是ds1,内层是ds2,此时使用队列是非常合适的。
public class DynamicContextHolder {
private static final ThreadLocal<Deque<String>> CONTEXT_HOLDER = ThreadLocal.withInitial(ArrayDeque::new);
public static String peek() {
return CONTEXT_HOLDER.get().peek();
}
public static void push(String dataSource) {
CONTEXT_HOLDER.get().push(dataSource);
}
public static void poll() {
Deque<String> deque = CONTEXT_HOLDER.get();
deque.poll();
if (deque.isEmpty()) {
CONTEXT_HOLDER.remove();
}
}
}
接下来定义注解来标识使用哪个数据源
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DynamicDataSource {
String value() default "";
}
对应的切面
@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DataSourceAspect {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Pointcut("@annotation(com.example.annotation.DynamicDataSource) " +
"|| @within(com.example.annotation.DynamicDataSource)")
public void dataSourcePointCut() {
}
@Around("dataSourcePointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Class<?> targetClass = point.getTarget().getClass();
Method method = signature.getMethod();
DynamicDataSource targetDataSource = targetClass.getAnnotation(DynamicDataSource.class);
DynamicDataSource methodDataSource = method.getAnnotation(DynamicDataSource.class);
if (targetDataSource != null || methodDataSource != null) {
String value;
if (methodDataSource != null) {
value = methodDataSource.value();
} else {
value = targetDataSource.value();
}
DynamicContextHolder.push(value);
logger.info(">>> set datasource success {}", value);
} else {
logger.info(">>> use default datasource...");
}
try {
return point.proceed();
} finally {
DynamicContextHolder.poll();
}
}
}
在切面中,切点同时使用了@annotation 和@within ,前者是方法级别,用于拦截方法上的注解,后者是对象级别,用于拦截类上的注解。在设置数据源的时候,优先使用方法级别,其次才使用类级别。
接下来新建对应的库和测试表来进行测试。
create database if not exists `db0` default character set utf8mb4 collate utf8mb4_bin;
use `db0`;
create table if not exists user_info
(
id bigint primary key auto_increment,
name varchar(64),
age tinyint
);
create database if not exists `db1` default character set utf8mb4 collate utf8mb4_bin;
use `db1`;
create table if not exists user_info
(
id bigint primary key auto_increment,
name varchar(64),
age tinyint
);
create database if not exists `db2` default character set utf8mb4 collate utf8mb4_bin;
use `db2`;
create table if not exists user_info
(
id bigint primary key auto_increment,
name varchar(64),
age tinyint
);
insert into db0.user_info(name, age) VALUE ('jack','18');
insert into db1.user_info(name, age) VALUE ('mary','18');
insert into db2.user_info(name, age) VALUE ('john','18');
创建对应的实体类和mybatis 操作接口,代码略。
然后配置数据源
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/db0?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
username: root
password: root
dynamic:
datasource:
dataSource01:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/db1?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
username: root
password: root
dataSource02:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/db2?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
username: root
password: root
接下来就可以测试了,有如下类,类中有三个方法,分别来更新三个数据源的数据的age字段,同时配置了事务的传播类型为REQUIRED ,即若当前不存在事务,则创建新的事务,若存在,则加入当前事务。
@Slf4j
@Service
@DynamicDataSource
public class TestService {
@Autowired
private UserInfoMapper userInfoMapper;
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void updateDefault() {
UserInfo info = new UserInfo().setId(1L).setAge(1);
userInfoMapper.updateById(info);
log.info("ds0: {}", userInfoMapper.selectById(info.pkVal()));
}
@DynamicDataSource("dataSource01")
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void updateDS1() {
UserInfo info = new UserInfo().setId(1L).setAge(2);
userInfoMapper.updateById(info);
log.info("ds1: {}", userInfoMapper.selectById(info.pkVal()));
}
@DynamicDataSource("dataSource02")
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void updateDS2() {
UserInfo info = new UserInfo().setId(1L).setAge(3);
userInfoMapper.updateById(info);
log.info("ds2: {}", userInfoMapper.selectById(info.pkVal()));
}
}
有如下测试类
@SpringBootTest
public class SpringBootDynamicDatasourceApplicationTests {
@Autowired
TestService testService;
@Rollback(false)
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@Test
public void test() {
testService.updateDefault();
testService.updateDS1();
testService.updateDS2();
}
}
按照理解的,执行完上述测试方法后,三个数据源对应的表中,id为1的数据的age应该分别被更新为1,2,3。
接下来就来执行它,执行之后,日志如下
发现问题了,从绿色框中可以看到,切面起作用了,但是为何三个方法内打印的UserInfo都是ds0的呢,按理来说,后面两个不应该是ds1,ds2吗?问题出在哪呢?
有经验的大佬应该已经知道了,问题就出在事务的传播级别上,从图中也可以看出,事务的开启和关闭仅有一次,那就改变传播级别再来测试一下。
首先,可以知道的是,如果将测试方法上的事务去掉,那么TestService 服务的三个方法会运行在各自的事务中,互不相关。验证一下,如下,注释掉测试方法上的事务注解,TestService 不变
@Rollback(false)
@Test
public void test() {
testService.updateDefault();
testService.updateDS1();
testService.updateDS2();
}
执行后,日志如下,可以看到,数据源是切换了,结果是符合预期的,但是有一个大问题,三个方法分别运行在各自的事务中,这就无法保证一致性了,例如三个方法中前面的执行成功了,但是后面的执行失败了,那么前面的是不会回滚的(面试常问的事务失效场景之一)。
为了验证这个问题,在上面的基础上,修改TestService 的第三个方法updateDS2 ,让方法抛出一个异常
@DynamicDataSource("dataSource02")
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void updateDS2() {
UserInfo info = new UserInfo().setId(1L).setAge(3);
userInfoMapper.updateById(info);
log.info("ds2: {}", userInfoMapper.selectById(info.pkVal()));
int i = 1/0;
}
再来执行一次(在重新执行测试方法之前,需要先将数据还原),日志如下。这时候再来查询数据库,会发现前两个没有回滚,但第三个回滚了,所以去掉测试方法上的事务注解显然是不可行的。
同样的,如果恢复测试方法上的事务注解,但是将TestService 的三个方法上的事务传播类型由REQUIRED 改为REQUIRES_NEW ,结果与上面的测试是类似的。虽然最外层有一个事务了,但是里面的三个方法分别开启了新事务,所以最后一个方法抛出的异常不会对前两个方法的事务产生影响。
也就是说,如果进入切面前存在事务,那么即便切面里面将数据源放入当前线程的ThreadLocal 了,AbstractRoutingDataSource 也不会进行切换。不信可以在自定义的实现类DynamicDataSource 中的determineCurrentLookupKey 方法里面打个断点试一下。
总结,AbstractRoutingDataSource 切换数据源,适合目标方法(要切换数据源的方法)外层没有被事务包裹或目标方法运行于独立的事务之中才有效。
|