目前主流的分布式事务解决方案有很多种,主流的主要有:LCN、TCC、TXC三种模式,本文主要讲解LCN的原理。 想看TX-LCN官方文档请传送
[tx-lcn官方文档](https://www.codingapi.com/docs/txlcn-preface/)
一、分布式事务是什么?
分布式事务是基于服务微服务化之后引申出的事务问题; 在单体架构的服务中,通过本地事务一次性解决对数据库数据操作,通过本地事务实现数据的一致性,如下图左侧所示。 在微服务场景下就有可能出现A调用B服务的过程中,B服务成功了,A服务失败了,导致了最终AB数据不一致的情况,如下图右侧所示。 那么要解决分布式事务的问题,其实就是要考虑怎么实现数据的一致性,核心就是CAP定律 BASE理论,根据这个方向引申出了以下几种分布式事务解决方案: 1、TCC TCC即Try、Confirm,Cancle,从翻译的字面意思理解其实就是这个解决方案的原理,对数据的提交和失败都做相应的业务补偿。 一般来说TCC都是指的一种解决方案,如果使用TX-LCN框架,更多指的是对Confirm的提交确认和Cancle的补偿机制,对补偿的业务代码逻辑性要求很高,这类方案其实还有很多需要升级的地方,后续有机会会补充讲解。
该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。 该模式对有无本地事务控制都可以支持使用面广。 数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。
2、LCN LCN其实是对2PC模式的一种实现,即通过事务协调者来控制对业务事务的两段提交,和TXC其实算是一类模式,由TC和TM两个组件组成,TC负责客户端的事务代理和TM协作,TM负责管控事务的生命周期,以及记录异常日志。
该模式对代码的嵌入性为低。 该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。 该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。 该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。
3、TXC TXC的实现原理是在执行SQL之前,先解析SQL,查询影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制。
该模式与LCN模式类似同样对代码的嵌入性低。 该模式仅限于对支持SQL方式的模块支持。 该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。 该模式不会占用数据库的连接资源。
二、TC和TM
LCN模式的设计思路是由TM和TC组成的,其中图示的LCN代理连接池就是TC对客户端的代理,我们需要对业务工程的本地连接池代理,即需要对业务代码做控制,该控制的实现在Tx-LCN的设计方案里叫TC。
三、TC
3.1读TC的开始
读一个框架,首先要看的是官方文档,通过文档我们可以了解到Tx-Lcn框架整体是对Spring进行了天生依赖,版本是SpringBoot2.x,在后续实现的解释中,我们就可以通过Springboot的官方文档参照研究TC模块的设计模式。
TC模块在初始化过程中使用了很多@Configuration和@ConfigurationProperties的注解对配置进行管理。 例如: 1. 初始化Tx相关的信息 从这里其实可以通过解读该框架理解其他框架也是可以通过类似的方式,读取需要的用户配置。
2. 初始化对连接池和本地事务的代理:
3.2 TC的结构
3.3 一切的开始源于Aspect包
1)代理初始化DataSource连接池;
2)通过对扫描注解@LcnTransaction对所有service服务,实现代理;
package com.codingapi.txlcn.tc.aspect;
import com.codingapi.txlcn.p6spy.CompoundJdbcEventListener;
import com.codingapi.txlcn.tc.config.TxConfig;
import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.jdbc.JdbcTransactionDataSource;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AspectConfiguration {
@Bean
@ConditionalOnMissingBean
public TransactionAspectContext transactionAspectManager(TransactionContext transactionContext,
AnnotationContext annotationContext) {
return new TransactionAspectContext(transactionContext, annotationContext);
}
@Bean
public Advisor txTransactionAdvisor(TxTransactionInterceptor txTransactionInterceptor, TxConfig txConfig){
AspectJExpressionPointcut pointcut=new AspectJExpressionPointcut();
pointcut.setExpression(txConfig.getTransactionPointcut());
return new DefaultPointcutAdvisor(pointcut, txTransactionInterceptor);
}
@Bean
public Advisor txDataSourceAdvisor(TxDataSourceInterceptor txDataSourceInterceptor, TxConfig txConfig){
AspectJExpressionPointcut pointcut=new AspectJExpressionPointcut();
pointcut.setExpression(txConfig.getDatasourcePointcut());
return new DefaultPointcutAdvisor(pointcut, txDataSourceInterceptor);
}
@Bean
public TxTransactionInterceptor txTransactionInterceptor(TransactionAspectContext transactionAspectContext){
return new TxTransactionInterceptor(transactionAspectContext);
}
@Bean
public TxDataSourceInterceptor txDataSourceInterceptor(CompoundJdbcEventListener compoundJdbcEventListener, JdbcTransactionDataSource jdbcTransactionDataSource){
return new TxDataSourceInterceptor(compoundJdbcEventListener,jdbcTransactionDataSource);
}
}
3)对TM的监听
TC通过初始化一个ScheduledThreadPoolExecutor的定时任务线程池,实现对TC节点的监听。
public TmServerRunner(TxConfig txConfig, ProtocolServer protocolServer,
SnowflakeStep snowFlakeStep, TxManagerReporter txManagerReporter) {
this.txConfig = txConfig;
this.protocolServer = protocolServer;
this.snowFlakeStep = snowFlakeStep;
this.reporter = txManagerReporter;
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("tmServerRunner-pool-%d").build());
}
**
4)初始化连接TM
**
public void init() {
try {
CompletableFuture<Void> futureToNotify = new CompletableFuture<>();
scheduledExecutorService.scheduleAtFixedRate(() -> {
List<InetSocketAddress> iNetSocketAddresses = ListUtil.isEmpty(txConfig.getINetSocketAddresses()) ?
txConfig.txManagerAddresses() : txConfig.getINetSocketAddresses();
iNetSocketAddresses.forEach(address -> {
protocolServer.connectTo(address.getHostString(), address.getPort(), futureToNotify);
futureToNotify.whenCompleteAsync((s, throwable) -> {
log.debug("=> futureToNotify.whenCompleteAsync");
snowFlakeStep.getGroupIdAndLogId();
this.tryToGetMoreTmResource(iNetSocketAddresses);
});
});
}, 0, 30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
5)基于DDD优化后的切面设计
package com.codingapi.txlcn.tc.aspect;
import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.control.TransactionState;
import com.codingapi.txlcn.tc.control.TransactionStateStrategy;
import com.codingapi.txlcn.tc.info.TransactionInfo;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import com.codingapi.txlcn.tc.resolver.TxAnnotation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInvocation;
import java.lang.reflect.Method;
@Slf4j
@AllArgsConstructor
public class TransactionAspectContext {
private TransactionContext transactionContext;
private AnnotationContext annotationContext;
public Object runWithTransaction(MethodInvocation invocation) throws Throwable {
Method targetMethod = invocation.getMethod();
TxAnnotation txAnnotation = annotationContext.getAnnotation(targetMethod);
if(txAnnotation==null){
return invocation.proceed();
}
TransactionState transactionState = TransactionStateStrategy.getTransactionState();
TransactionInfo transactionInfo = TransactionInfo.current();
if(transactionInfo==null){
transactionInfo = new TransactionInfo(transactionState);
}
transactionInfo.setTransactionType(txAnnotation.getType());
log.debug("run with tx-lcn start...");
Object res = null;
try {
transactionContext.tryBeginTransaction(transactionInfo);
res = invocation.proceed();
transactionInfo.setSuccessReturn(true);
}catch (Exception e){
transactionInfo.setSuccessReturn(false);
throw e;
}finally {
transactionContext.tryEndTransaction(transactionInfo);
transactionContext.clearTransaction();
}
log.debug("run with tx-lcn over.");
return res;
}
}
6)动态的执行步骤
package com.codingapi.txlcn.tc.control;
import com.codingapi.txlcn.tc.info.TransactionInfo;
import java.util.List;
import java.util.Optional;
public class TransactionStepContext {
private List<TransactionStep> transactionSteps;
public TransactionStepContext(List<TransactionStep> transactionSteps) {
this.transactionSteps = transactionSteps;
}
private Optional<TransactionStep> transactionStep(TransactionState type) {
for (TransactionStep transactionStep : transactionSteps) {
if (transactionStep.type().equals(type)) {
return Optional.of(transactionStep);
}
}
return Optional.empty();
}
public void execute(TransactionInfo transactionInfo){
Optional<TransactionStep> transactionStep = transactionStep(transactionInfo.getTransactionState());
transactionStep.ifPresent(step->step.run(transactionInfo));
}
}
四、LCN机制的核心
五、被代理的连接们
六、并不完美的数据最终一致性
总结
|