IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 动态数据源解析 Spring AOP事务 -> 正文阅读

[Java知识库]动态数据源解析 Spring AOP事务

未来项目规划要为每一个企业创建单独的数据库,所以近期在研究Spring事务对于不同数据源的事务的管理,或者说怎么才能在动态切换数据源的情况下事务对每个数据源都会生效.

我们知道Spring AOP的底层实现有两种方式:一种是JDK动态代理,另一种是CGLib的方式.

事务ACID特性:

  • 原子性 (atomicity): 强调事务的不可分割.
  • 一致性 (consistency):事务的执行的前后数据的完整性保持一致.
  • 隔离性 (isolation): 一个事务执行的过程中,不应该受到其他事务的干扰
  • 持久性(durability) : 事务一旦结束,数据就持久到数据库

Spring事务设计的主要类:

  1. TransactionAspectSupport 事务的基类
  2. TranctionInterceptor 实现事务的基类,并对其进行增强
  3. PlatformTransactionManager 事务管理器
  4. TransactionDefinition 事务定义
  5. TransactionStatus 事务状态

先看正常项目的启动事务管理:
在这里插入图片描述
在这里插入图片描述
因为我们使用的注解声明事务,所以项目启动后会加载配置的数据源属性,并通过DataSourceTransactionManagerAutoConfiguration自动注入默认的事务管理器.

测试时的数据源是模拟不同用户登陆来切换的. 先说测试结果,服务启动后动态切换任何的数据源,Spring都会根据当前的数据库Connection创建一个事务管理器.所以动态数据源下其事务是会生效的.

根据源码来分析
直接从TransactionInterceptor的invoke方法开始,它是AOP声明式事务的入口方法
在这里插入图片描述

public Object invoke(MethodInvocation invocation) throws Throwable {
	// 获取代理的目标类
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
	// Spring对Kotlin的支持,我们不走这个,直接看下面的return 
	if (KotlinDetector.isSuspendingFunction(invocation.getMethod())) {
		InvocationCallback callback = new CoroutinesInvocationCallback() {
			@Override
			public Object proceedWithInvocation() {
				return CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(), invocation.getArguments());
			}
			@Override
			public Object getContinuation() {
				return invocation.getArguments()[invocation.getArguments().length - 1];
			}
		};
		return invokeWithinTransaction(invocation.getMethod(), targetClass, callback);
	}
	return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

invokeWithinTransaction方法是TransactionAspectSupport类中的方法,
三个参数分别是:
methed 调用的方法
targetClass 目标类
invocation 回调

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// 获取事务资源并构建事务属性
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		// 构建事务管理器
		final TransactionManager tm = determineTransactionManager(txAttr);
		// 是否是Reactive类型
		if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
			boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
			boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
			ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
				Class<?> reactiveType = (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
				ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
				if (adapter == null) {
					throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
							method.getReturnType());
				}
				return new ReactiveTransactionSupport(adapter);
			});
			Object result = txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
			return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher<?>) result) :
					KotlinDelegate.awaitSingleOrNull((Publisher<?>) result, ((CoroutinesInvocationCallback) invocation).getContinuation())) : result);
		}
		// 重点 将事务管理器包装为PlatformTransactionManager
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		// 返回调用的方法全名称加包名类名
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
		// ptm为PlatformTransactionManager 类型,进入判断
		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// 使用 getTransaction 和 commit/rollback 调用进行标准事务划分。
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// 执行异常操作
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
			   // 每次执行都会清除事务
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}

			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

		else {
			Object result;
			final ThrowableHolder throwableHolder = new ThrowableHolder();

			// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
			try {
				result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
					TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
					try {
						Object retVal = invocation.proceedWithInvocation();
						if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
							// Set rollback-only in case of Vavr failure matching our rollback rules...
							retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
						}
						return retVal;
					}
					catch (Throwable ex) {
						if (txAttr.rollbackOn(ex)) {
							// A RuntimeException: will lead to a rollback.
							if (ex instanceof RuntimeException) {
								throw (RuntimeException) ex;
							}
							else {
								throw new ThrowableHolderException(ex);
							}
						}
						else {
							// A normal return value: will lead to a commit.
							throwableHolder.throwable = ex;
							return null;
						}
					}
					finally {
						cleanupTransactionInfo(txInfo);
					}
				});
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
			catch (TransactionSystemException ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
					ex2.initApplicationException(throwableHolder.throwable);
				}
				throw ex2;
			}
			catch (Throwable ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
				}
				throw ex2;
			}

			// Check result state: It might indicate a Throwable to rethrow.
			if (throwableHolder.throwable != null) {
				throw throwableHolder.throwable;
			}
			return result;
		}
	}
/**
* 根据给定的事务属性创建事务
*/
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

		// 若没有指定事务名称,默认是调用方法的标识
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}

		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
			  // 定义或获取事务的传播行为
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		// 构建事务基础信息
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

定义或获取事务的传播行为

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
		throws TransactionException {

	// 如果没有给出事务定义,则使用默认值。
	TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
	// 查看之前是否存在事务
	Object transaction = doGetTransaction();
	boolean debugEnabled = logger.isDebugEnabled();
	// 因为数据源是动态的,每次创建数据源都认为是新的,所以不进入if
	if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		return handleExistingTransaction(def, transaction, debugEnabled);
	}

	// 检查新事务定义是否合法
	if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
		throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
	}

	// 检查事务的传播行为,并开启事务
	if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
		throw new IllegalTransactionStateException(
				"No existing transaction found for transaction marked with propagation 'mandatory'");
	}
	else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		SuspendedResourcesHolder suspendedResources = suspend(null);
		if (debugEnabled) {
			logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
		}
		try {
		// 最最核心的方法
			return startTransaction(def, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error ex) {
			resume(null, suspendedResources);
			throw ex;
		}
	}
	else {
		// Create "empty" transaction: no actual transaction, but potentially synchronization.
		if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
					"isolation level will effectively be ignored: " + def);
		}
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
	}
}

这个方法是事务的精髓核心

/**
	 * Start a new transaction.
	 */
	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		// 根据事务属性设置事务状态
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
		// 走的是DataSourceTM的实现 在这个方法里 会根据我们当前动态的数据源连接创建这个数据源的事务,在此之前,所有Bean中,或者是之前事务管理器中加载的都是默认注册时配置注入的数据源. 
		doBegin(transaction, definition);
		// 执行事务同步
		prepareSynchronization(status, definition);
		return status;
	}

以上只是针对我们测试的情况相关的主要源码分析,其他的一些还需要自行阅读源码.整个SpringAop事务源码读下来收获非常多.以上是自己对源码的理解,若有不对请多指教.

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:29:41  更:2021-08-06 09:31:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/12 12:05:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码