IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Tx-LCN源码通读-Lcn模式(1) -> 正文阅读

[Java知识库]Tx-LCN源码通读-Lcn模式(1)

目前主流的分布式事务解决方案有很多种,主流的主要有:LCN、TCC、TXC三种模式,本文主要讲解LCN的原理。
想看TX-LCN官方文档请传送


[tx-lcn官方文档](https://www.codingapi.com/docs/txlcn-preface/)

一、分布式事务是什么?

分布式事务是基于服务微服务化之后引申出的事务问题;
在单体架构的服务中,通过本地事务一次性解决对数据库数据操作,通过本地事务实现数据的一致性,如下图左侧所示。
在微服务场景下就有可能出现A调用B服务的过程中,B服务成功了,A服务失败了,导致了最终AB数据不一致的情况,如下图右侧所示。
请添加图片描述
那么要解决分布式事务的问题,其实就是要考虑怎么实现数据的一致性,核心就是CAP定律 BASE理论,根据这个方向引申出了以下几种分布式事务解决方案:
1、TCC
TCC即Try、Confirm,Cancle,从翻译的字面意思理解其实就是这个解决方案的原理,对数据的提交和失败都做相应的业务补偿。
一般来说TCC都是指的一种解决方案,如果使用TX-LCN框架,更多指的是对Confirm的提交确认和Cancle的补偿机制,对补偿的业务代码逻辑性要求很高,这类方案其实还有很多需要升级的地方,后续有机会会补充讲解。

该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
该模式对有无本地事务控制都可以支持使用面广。
数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。

2、LCN
LCN其实是对2PC模式的一种实现,即通过事务协调者来控制对业务事务的两段提交,和TXC其实算是一类模式,由TC和TM两个组件组成,TC负责客户端的事务代理和TM协作,TM负责管控事务的生命周期,以及记录异常日志。

该模式对代码的嵌入性为低。
该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

3、TXC
TXC的实现原理是在执行SQL之前,先解析SQL,查询影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制。

该模式与LCN模式类似同样对代码的嵌入性低。
该模式仅限于对支持SQL方式的模块支持。
该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。
该模式不会占用数据库的连接资源。

二、TC和TM

来源Tx-LCN手册LCN模式的设计思路是由TM和TC组成的,其中图示的LCN代理连接池就是TC对客户端的代理,我们需要对业务工程的本地连接池代理,即需要对业务代码做控制,该控制的实现在Tx-LCN的设计方案里叫TC。

三、TC

3.1读TC的开始

读一个框架,首先要看的是官方文档,通过文档我们可以了解到Tx-Lcn框架整体是对Spring进行了天生依赖,版本是SpringBoot2.x,在后续实现的解释中,我们就可以通过Springboot的官方文档参照研究TC模块的设计模式。
通过官方文档知道

TC模块在初始化过程中使用了很多@Configuration和@ConfigurationProperties的注解对配置进行管理。
例如:
1. 初始化Tx相关的信息
初始化Tx相关的信息
从这里其实可以通过解读该框架理解其他框架也是可以通过类似的方式,读取需要的用户配置。

2. 初始化对连接池和本地事务的代理:

在这里插入图片描述

3.2 TC的结构

在这里插入图片描述

3.3 一切的开始源于Aspect包

1)代理初始化DataSource连接池;

2)通过对扫描注解@LcnTransaction对所有service服务,实现代理;

package com.codingapi.txlcn.tc.aspect;

import com.codingapi.txlcn.p6spy.CompoundJdbcEventListener;
import com.codingapi.txlcn.tc.config.TxConfig;
import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.jdbc.JdbcTransactionDataSource;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//初始化切点
@Configuration
public class AspectConfiguration {

//这里需要注意ConditionalOnMissingBean注解,该方法是用来构建唯一的事务切点上下文管理,为了保证事务的隔离性。
  @Bean
  @ConditionalOnMissingBean
  public TransactionAspectContext transactionAspectManager(TransactionContext transactionContext,
                                                           AnnotationContext annotationContext) {
    return new TransactionAspectContext(transactionContext, annotationContext);
  }

//1、该方法具体实现了对所有带有@LcnTransaction注解的方法切点,即对本地事务代理的初始化过程。
//2、对@LcnTransaction注解的扫描方式涉及到一个@annotation的切面增强的用法。
//3、依赖注入了txTransactionInterceptor来实现具体的拦截方法
  @Bean
  public Advisor txTransactionAdvisor(TxTransactionInterceptor txTransactionInterceptor, TxConfig txConfig){
    AspectJExpressionPointcut pointcut=new AspectJExpressionPointcut();
    pointcut.setExpression(txConfig.getTransactionPointcut());
    return new DefaultPointcutAdvisor(pointcut, txTransactionInterceptor);
  }

//1、同样需要注意的是@ConditionalOnMissingBean注解,该方法是用来构建唯一的事务切点上下文管理,为了保证事务的隔离性。
//2、数据源切点实现很简单,直接通过TxConfig默认注入的execution(* javax.sql.DataSource.getConnection(..))包来获取切点
//3、依赖注入了txDataSourceInterceptor来实现具体的拦截方法
  @Bean
  public Advisor txDataSourceAdvisor(TxDataSourceInterceptor txDataSourceInterceptor, TxConfig txConfig){
    AspectJExpressionPointcut pointcut=new AspectJExpressionPointcut();
    pointcut.setExpression(txConfig.getDatasourcePointcut());
    return new DefaultPointcutAdvisor(pointcut, txDataSourceInterceptor);
  }


//TxTransaction具体的拦截器
  @Bean
  public TxTransactionInterceptor txTransactionInterceptor(TransactionAspectContext transactionAspectContext){
    return new TxTransactionInterceptor(transactionAspectContext);
  }

//DataSource具体的拦截器
  @Bean
  public TxDataSourceInterceptor txDataSourceInterceptor(CompoundJdbcEventListener compoundJdbcEventListener, JdbcTransactionDataSource jdbcTransactionDataSource){
    return new TxDataSourceInterceptor(compoundJdbcEventListener,jdbcTransactionDataSource);
  }


}

3)对TM的监听

TC通过初始化一个ScheduledThreadPoolExecutor的定时任务线程池,实现对TC节点的监听。

public TmServerRunner(TxConfig txConfig, ProtocolServer protocolServer,
                          SnowflakeStep snowFlakeStep, TxManagerReporter txManagerReporter) {
        //1、tx的配置,具体包含了数据库类型、协议类型、TM节点信息等
        this.txConfig = txConfig;
        //2、使用的通讯服务初始化
        this.protocolServer = protocolServer;
        //3、初始化全局groupid生成器
        this.snowFlakeStep = snowFlakeStep;
        //4、与tm通讯的具体实现类
        this.reporter = txManagerReporter;
        //5、初始化监听tm的线程池
        //6、需要注意的是这里用的是默认ScheduledThreadPoolExecutor,最大线程数是Integer.MAX_VALUE,建议根据实际场景切换自定义最大线程数处理。
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("tmServerRunner-pool-%d").build());
    }

**

4)初始化连接TM

**


    /**
     * 初始化连接
     */
    public void init() {
        try {
            //1、通过异步的方式,tc在对tm不断获取连接并刷新本地唯一的groupid值
            CompletableFuture<Void> futureToNotify = new CompletableFuture<>();
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                List<InetSocketAddress> iNetSocketAddresses = ListUtil.isEmpty(txConfig.getINetSocketAddresses()) ?
                        txConfig.txManagerAddresses() : txConfig.getINetSocketAddresses();
                iNetSocketAddresses.forEach(address -> {
                    protocolServer.connectTo(address.getHostString(), address.getPort(), futureToNotify);
                    futureToNotify.whenCompleteAsync((s, throwable) -> {
                        log.debug("=> futureToNotify.whenCompleteAsync");
                        //2、不断获取全局的唯一的GroupId并缓存
                        snowFlakeStep.getGroupIdAndLogId();
                        //3、尝试加入更多TM节点。
                        this.tryToGetMoreTmResource(iNetSocketAddresses);
                    });
                });
            }, 0, 30, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }

    }

5)基于DDD优化后的切面设计

package com.codingapi.txlcn.tc.aspect;

import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.control.TransactionState;
import com.codingapi.txlcn.tc.control.TransactionStateStrategy;
import com.codingapi.txlcn.tc.info.TransactionInfo;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import com.codingapi.txlcn.tc.resolver.TxAnnotation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInvocation;

import java.lang.reflect.Method;


// 这个类是对切点的具体实现
// 6.0开始作者对txlcn做了设计上的优化,抽象lcn、tcc、txc模式为动态对象。
@Slf4j
@AllArgsConstructor
public class TransactionAspectContext {

  //事务管理器的上下文
  //负责管理整个事务生命周期
  private TransactionContext transactionContext;

  //注解读取器的上下文
  //在初始化过程中已经对添加@LcnTransaction注解的service实现了缓存
  private AnnotationContext annotationContext;

  public Object runWithTransaction(MethodInvocation invocation) throws Throwable {
    Method targetMethod = invocation.getMethod();

    // 1、读取被切点是否为LCN
    TxAnnotation txAnnotation = annotationContext.getAnnotation(targetMethod);
    if(txAnnotation==null){
      return invocation.proceed();
    }

    // 2、事务状态判定
    TransactionState transactionState = TransactionStateStrategy.getTransactionState();
    TransactionInfo transactionInfo = TransactionInfo.current();
    // 3、当transactionInfo == null表明是开始分布式事务
    if(transactionInfo==null){
      transactionInfo = new TransactionInfo(transactionState);
    }
    // 4、统一设置事务类型
    transactionInfo.setTransactionType(txAnnotation.getType());

    log.debug("run with tx-lcn start...");
    Object res = null;
    try {
      // 5、尝试开始代理事务
      transactionContext.tryBeginTransaction(transactionInfo);
      res = invocation.proceed();
      transactionInfo.setSuccessReturn(true);
    }catch (Exception e){
      transactionInfo.setSuccessReturn(false);
      throw e;
    }finally {
      // 6、尝试结束代理事务
      transactionContext.tryEndTransaction(transactionInfo);
      // 7、尝试结束代理事务
      transactionContext.clearTransaction();
    }
    log.debug("run with tx-lcn over.");
    return res;
  }

}


6)动态的执行步骤

package com.codingapi.txlcn.tc.control;

import com.codingapi.txlcn.tc.info.TransactionInfo;

import java.util.List;
import java.util.Optional;

/**
 * @author lorne
 * @date 2020/3/5
 * @description
 */
public class TransactionStepContext {

    private List<TransactionStep> transactionSteps;

    public TransactionStepContext(List<TransactionStep> transactionSteps) {
        this.transactionSteps = transactionSteps;
    }

    private Optional<TransactionStep> transactionStep(TransactionState type) {
        for (TransactionStep transactionStep : transactionSteps) {
            if (transactionStep.type().equals(type)) {
                // 构建具体的transactionStep对象 例如TransactionStepCreate
                return Optional.of(transactionStep);
            }
        }
        return Optional.empty();
    }

    public void execute(TransactionInfo transactionInfo){
        // 在具体执行事务的操作步骤时通过Optional的ifPresent方法通过lambda表达式动态执行步骤
        Optional<TransactionStep> transactionStep =  transactionStep(transactionInfo.getTransactionState());
        // 翻译后为 TransactionStepCreate.run(transactionInfo)
        transactionStep.ifPresent(step->step.run(transactionInfo));
    }


}


四、LCN机制的核心

五、被代理的连接们

六、并不完美的数据最终一致性

总结

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-12-13 12:40:04  更:2021-12-13 12:42:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 6:06:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码