参考:
一、声明式事务
1.1 @Transactional
在分析源码前,我们必须对声明式事务的使用,即@Transactional 注解比较熟悉,因为源码中就是根据注解的属性进行解析,创建不同的事务,特别是传播机制。
@Transactional 注解可以放在类或方法上:
- 类上:指定某个类及子类中所有方法
- 方法上:指定某个方法启用事务
@Transactional 属性如下表所示:
属性 | 默认值 | 描述 |
---|
value/transactionManager | “” | 事务管理器 | propagation | Propagation.REQUIRED | 传播机制 | isolation | Isolation.DEFAULT | 隔离级别,默认使用数据库的隔离级别,例如MySQL的可重复读 | timeout | TransactionDefinition.TIMEOUT_DEFAULT | 事务超时时间,单位:秒,默认不超时 | readOnly | false | 事务只读,默认非只读 | rollbackFor | | 指定哪些异常需要回滚事务,默认当异常是RuntimeException、Error会回滚事务 | rollbackForClassName | | 回滚事务的异常名称 | noRollbackFor | | 指定哪些异常不需要回滚事务 | noRollbackForClassName | | 不需要回滚事务的异常名称 |
1.1.1 propagation:传播机制
传播机制是非常重要的一个属性之一了,参考org.springframework.transaction.annotation.Propagation 枚举类,定义了所有的传播行为,可选值:
- REQUIRED:默认的传播机制,支持当前事务,如果不存在则创建一个新事务
- SUPPORTS:支持当前事务,如果不存在则以非事务方式执行
- MANDATORY:支持当前事务,如果不存在则抛出异常
- REQUIRES_NEW:创建一个新事务,如果存在,则挂起当前事务
- NOT_SUPPORTED:以非事务方式执行,如果存在则挂起当前事务
- NEVER:以非事务方式执行,如果存在事务则抛出异常
- NESTED:如果当前事务存在,则在嵌套事务中执行,其他行为类似于REQUIRED
1.1.1.1 REQUIRED 示例
(1)示例1
testA()、testB()传播机制都是REQUIRED ,所以这两个方法在同一个事务中:
- 不管在什么位置发生了异常,两个方法都会回滚,也就是a、b无法保存至数据库
@Transactional(propagation = Propagation.REQUIRED)
public void testA() {
insert(a);
bService.testB()
}
@Transactional(propagation = Propagation.REQUIRED)
public void testB() {
insert(b);
throws Exception();
}
(2)示例2
testA()不使用事务、testB()使用事务且传播机制都是REQUIRED ,testA()没有事务,testB()会新建一个事务:
- testA()中发生异常:a、b都可以正常保存,因为testB()有自己的事务
- testB()中发生了异常:a可以保存,b无法保存
public void testA() {
insert(a);
bService.testB()
throws Exception();
}
@Transactional(propagation = Propagation.REQUIRED)
public void testB() {
insert(b);
}
(3)示例3
testA()、testB()使用事务且传播机制都是REQUIRED ,testA()和testB()在同一个事务中:
- 若testA()中发生异常,并且捕获了该异常:a、b可以保存,因为捕获了异常,所以相当于正常执行
- 若testB()中发生异常,并且在testA()中捕获了testB()抛出的异常:a、b无法保存
- 因为testB()中发生了异常,造成当前事务回滚,并标记事务为rollback-only
- testA()捕获了异常,所以testA()会走正常的提交事务流程,但是发现事务状态已经标记为回滚了,所以抛出异常:UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only,所以也就只能回滚了(debug一下会比较容易理解)
@Transactional(propagation = Propagation.REQUIRED)
public void testA() {
insert(a);
bService.testB()
try {
throws Exception();
} catch (Exception e){
}
}
@Transactional(propagation = Propagation.REQUIRED)
public void testB() {
insert(b);
}
1.1.1.2 NESTED示例
(1)示例1
testA()开启一个事务(父事务)、testB()开启一个嵌套事务(子事务):
- 若testA()中发生异常:a、b无法保存,即父事务发生了异常,父子事务都会回滚
- 若testB()中发生异常:a、b也无法保存,即子事务发生了异常,父子事务都会回滚
@Transactional(propagation = Propagation.REQUIRED)
public void testA() {
insert(a);
bService.testB()
throws Exception();
}
@Transactional(propagation = Propagation.NESTED)
public void testB() {
insert(b);
}
(2)示例2
testA()开启一个事务(父事务)、testB()开启一个嵌套事务(子事务):
- 若testA()中发生异常,并且捕获了该异常:a、b可以保存,因为捕获了异常,所以相当于正常执行
- 若testB()中发生异常,并且在testA()中捕获了testB()抛出的异常:a可以保存,b无法保存,即子事务发生了异常,子事务会回滚,不影响父事务(对比下1.1.1.1中的示例3,都设置为REQUIRED,但是a、b都无法保存的)
@Transactional(propagation = Propagation.REQUIRED)
public void testA() {
insert(a);
try {
bService.testB();
} catch (Exception e) {
}
try {
throws Exception();
} catch (Exception e) {
}
}
@Transactional(propagation = Propagation.NESTED)
public void testB() {
insert(b);
}
其余属性暂时省略…
二、源码分析
声明式事务核心类是org.springframework.transaction.interceptor.TransactionInterceptor ,在类或方法添加@Transactional 注解后,Spring就会基于AOP为该类生成代理类,在调用目标方法时,就会触发TransactionInterceptor#invoke() 方法执行事务。
TransactionInterceptor类图如下图所示:
事务执行的答题流程:
- 获取事务属性
- 加载配置中配置的TransactionManager
- 不同的事务处理方式使用不同的逻辑
- 在目标方法执行前获取事务并收集事务信息
- 执行目标方法
- 出现异常,进行异常处理
- 提交事务前将事务信息清除
- 提交事务
下面具体分析下事务的创建、提交、回滚等相关源码。
2.1 事务执行入口
入口方法在TransactionInterceptor#invoke() 中,主要流程:
- 获取目标类、目标方法
- 调用父类
TransactionAspectSupport#invokeWithinTransaction() 执行事务
源码如下:
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
2.2 事务执行主流程
事务执行主要发生在TransactionAspectSupport#invokeWithinTransaction() 方法中,主要流程:
- 获取事务属性、事务管理器
- 判断是声明式事务、编程式事务,走不同的分支:
- 声明式事务:执行以下流程:
- 创建事务
- 执行目标方法
- 发生异常则回滚
- 提交事务前将事务信息清除
- 提交事务
- 编程式事务:这里不讨论
源码如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
}
}
从代码可以发现Spring事务源码整体风格:
- 搭建骨架,也就是通过调用方法,具体逻辑都在各个方法里
- 方法返回值,为下一步骤(方法)做准备
- 通过继承或者说大量利用设计模式,调用父类方法实现,再根据配置选择具体的实现
所以乍一看源码比较清晰,深入看就很复杂,各种抽象类,实现类也很多(代表扩展性很好,很标准的模板设计模式),非常值得学习。。
2.2.1 创建事务
创建事务逻辑在TransactionAspectSupport#createTransactionIfNecessary() 方法中,主要流程:
- 使用 DelegatingTransactionAttribute 封装 事务属性
- 获取事务
- 构造事务信息
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
} else {
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
2.1.1.1获取事务
AbstractPlatformTransactionManager#getTransaction() 来处理事务的准备工作,包括事务获取以及信息的构建,主要流程:
- 1.获取事务
- 2.如果当前线程存在事务则转向嵌套事务的处理
- 3.事务超时设置验证
- 4.事务传播机制校验
- 5.构建 DefaultTransactionStatus
- 6.完善 transaction,包括设置 ConnectionHolder、 隔离级别、 timeout,如果是新连接,则绑定到当前线程
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
} catch (Error err) {
resume(null, suspendedResources);
throw err;
}
} else {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
(1)doGetTransaction() :获取事务
主要流程:
- 如果当前线程中存在dataSource的连接,那么直接使用
- 开启允许保存点:是否设置了允许嵌套事务
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
(2)doBegin()
当前线程没有事务时,且目标方法的传播机制要求新建事务,那么就会执行该方法。
主要流程如下:
- 尝试获取连接
- 设置隔离级别以及只读标识
- 更改默认的提交设置
- 设置标志位,标识当前连接已经被事务激活
- 设置过期时间
- 将 connectionHolder绑定到当前线程
- 将事务信息记录在当前线程中
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
(3)prepareSynchronization()
将事务信息记录在当前线程中:新连接、隔离级别、是否只读、事务名称等
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
(4)handleExistingTransaction()
当前线程存在事务,嵌套事务的处理,就会执行该方法。 // TODO
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
} else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
2.1.2 准备事务信息
当已经建立事务连接并完成了事务信息的提取后,将所有的事务信息记录在Transactionlnfo 类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败, Spring会通过 Transactionlnfo类型的实例中的信息来进行回滚等后续工作。
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
txInfo.newTransactionStatus(status);
} else {
}
txInfo.bindToThread();
return txInfo;
}
2.2.2 回滚
当目标方法执行出现异常时,就需要执行回滚操作了,回滚逻辑在TransactionAspectSupport#completeTransactionAfterThrowing() 方法中,主要流程:
- 判断是否有事务:有事务才可能需要回滚
- 是否需要回滚:默认只有时RuntimeException或Error类型才要回滚
代码:
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
} catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
} else {
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
} catch (Error err) {
logger.error("Application exception overridden by commit error", ex);
throw err;
}
}
}
}
2.2.2.1 回滚的条件
默认的回滚条件在DefaultTransactionAttribute#rollbackOn() 方法中,很明显可以看出只有RuntimeException或Error类型的异常才需要回滚,否则就算抛出异常也会正常提交。
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
假如有需要,可以通过@Transactional注解的rollbackFor就可以更改: @Transactl。nal(propagation=Propagation.REQUIRED, rollbackFor=Exception class)
2.2.2.2 回滚处理
回滚逻辑在rollback() 方法里,最终调用了processRollback() 方法完成实际的回滚操作。主要流程:
- 如果事务已经完成,再次回滚就会抛出异常
- 事务没有完成,处理回滚:
- 当前事务有保存点时,使用保存点信息进行回滚(常用于嵌套事务,内嵌的事务异常不会引起外部事务的回滚)
- 当前事务是新事物,直接回滚
- 其他情况,表示有事务但是又不属于上面两种情况,一般是JTA,那么只做回滚标识,等到提交的时候统一不提交
- 自定义触发器的调用:
- 回滚前、完成回滚、回滚发生异常时都需要调用,自定义的触发器会根据这些信息作进一步处理
- 清空记录的资源并将挂起的资源恢复
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus);
}
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
doRollback(status);
} else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
doSetRollbackOnly(status);
} else {
}
} else {
}
} catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
} catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
} finally {
cleanupAfterCompletion(status);
}
}
2.2.2.3 回滚后的信息清除
回滚逻辑执行结束后,无论回滚是否成功,都必须要做的事情就是事务结束后的收尾工作。主要流程:
- 设置状态是对事务信息作完成标识以避免重复调用
- 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除
- 如果是新事务需要做些清除资源的工作
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
} catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
2.2.3 提交事务
当目标方法正常执行后,就需要执行提交操作了,提交逻辑在TransactionAspectSupport#commitTransactionAfterReturning() 方法中,主要流程:
- 判断是否有事务:有事务才需要提交事务
- 获取事务管理器,执行事务提交操作
代码:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
processRollback(defStatus);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
processRollback(defStatus);
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
return;
}
processCommit(defStatus);
}
2.2.3.1 处理事务提交
处理事务提交逻辑在AbstractPlatformTransactionManager#processCommit() 方法中,主要流程如下:
- TransactionSynchronization 中的对应方法的调用
- 有保存点、不是新事务,则不会执行提交事务操作
- 假如有全局回滚标识,但是没有在事务提交中获取到相关异常,则抛出异常
- 处理提交事务过程中出现异常,则进行回滚
- 事务提交完成后执行一些清理
在提交过程中事务也不是直接提交的:
- 当事务状态中有保存点信息,不会去提交事务
- 当事务非新事务的时,不会去提交事务
不能直接提交事务是主要考虑:嵌套事务
- 对于嵌套事务,正常的处理方式是将嵌套事务开始之前设置保存点
- 如果嵌套事务出现异常,根据保存点信息进行回滚
- 如果没有出现异常,嵌套事务并不会单独提交, 而是根据事务由最外层事务负责提交
- 所以如果存在保存点就表示不是最外层事务,不做保存操作(如果不是新事务则不提交也是类似考虑)
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
doCommit(status);
}
if (globalRollbackOnly) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
} catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}
2.2.3.2 执行事务提交
这部分逻辑比较简单:
- 获取当前事务使用的连接,执行提交操作
- 捕获SQLException,直接抛出事务异常
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.commit();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
2.3 总结
整体上大概过了一遍事务创建、提交、回滚等流程,以及对于各种隔离级别的事务的处理,实际上还有很多细节也没整明白,先放放吧。
建议通过在不同隔离级别下,设置抛出异常的位置,对照实际的结果进行debug,这样比较容易理解源码各个if else分支的含义。
|