事务生效,但数据源切换失败。
1. 前言
问题:使用mybatis-plus-dynamicdatasource实现数据源切换操作功能,在遇到同时借助spring-tx进行事务处理时,会发现数据源无法正确地切换到辅库。
能点开这篇博客的基本都是奔着解决方案来的,所以以下我们先介绍解决方案和相应的最佳实践;最后再分析下原因,过下相关源代码。
2. 解决方案
以下直接以代码形式,同时展示问题场景和解决方案。
@Service
public class TestService {
@Autowired
private MasterxMapper masterMapper;
@Autowired
private slaveMapper slaveMapper;
@Transactional
public void test(){
Console.log("main logic");
User user = new User();
user.setUuid(IdUtil.simpleUUID());
user.setXxx("12332112311");
masterMapper.insert(user);
getService().slaveLook();
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
@DS("slave")
public void slaveLook(){
Console.log(slaveMapper.selectById(123));
}
private TestService getService(){
return SpringUtil.getBean(this.getClass());
}
3. 最佳实践
-
再次提醒,副库并不会获得事务的支持。如果你需要同时支持辅库上的修改操作,请使用Atomikos等分布式事务。 -
@DS 建议打在Service上,这是源自mybatis-plus-dynamicdatasource官方的建议。 -
上面的getService() 方法可以考虑抽取为一个接口,在Java8的默认接口方法实现的支持下,这将是个非常简单的事情。 public interface SelfAop<T> {
default T getSelf() {
return ((T)SpringUtil.getBean(this.getClass()));
}
}
4. 原理分析
以上TestService.test() 执行时,将获得如下堆栈:
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException(
"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
......
}
return con;
}
/ 针对以上 DataSourceUtils.doGetConnection(DataSource dataSource), 与Mybatis相关的堆栈如下:
SimpleExecutor.prepareStatement()
BaseExecutor.getConnection(Log statementLog)
transaction.getConnection();
DataSourceUtils.doGetConnection(DataSource dataSource)
4.1 相关类
SpringTransactionAnnotationParser 。 启动阶段解析@Transactional TransactionInterceptor // 执行阶段 @Transactional DynamicDataSourceAnnotationInterceptor // 针对注解 @DS
5. 参考
- spring boot学习7之mybatis+mysql读写分离(一写多读)+事务
- SpringBoot事务隔离等级和传播行为
- dynamic-datasource切换数据源失败
|