IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 分布式事务解决方案和代码落地 -> 正文阅读

[Java知识库]分布式事务解决方案和代码落地

在学习Spring cloud alibaba Seata之前,我们先来了解一下分布式事务的常用解决方案和代码实现,看完出去面试再遇到分布式事务的问题,战无不胜。文章包括了2PC,3PC刚性事务;TCC,本地消息表,可靠性消息,双写对账,最大努力通知,sage事务等柔性事务,并且大多数都有代码参考。

1.什么是分布式事务

完成某一个业务功能可能需要横跨多个服务,操作多个数据库,这个时候,我们本地事务就无法保证每个服务对数据库的操作是同时成功或者同时失败,这个时候就需要我们了解分布式事务。

1.1 分布式事务场景

1.跨库事务 ?

? ? ? ? ?跨库事务指的是,一个应用某个功能需要操作多个库。

2.分库分表

通常一个库数据量比较大或者预期未来的数据量比较大,都会进行水平拆分,也就是分库分表。如下图,将数据库B拆分成了2个库:

对于分库分表的情况,一般开发人员都会使用一些数据库中间件来降低sql操作的复杂性。如,对于sql:insert into user(id,name) values (1,"张三"),(2,"李四")。这条sql是操作单库的语法,单库情况下,可以保证事务的一致性。

但是由于现在进行了分库分表,开发人员希望将1号记录插入分库1,2号记录插入分库2。所以数据库中间件要将其改写为2条sql,分别插入两个不同的分库,此时要保证两个库要不都成功,要不都失败,因此基本上所有的数据库中间件都面临着分布式事务的问题。

3.服务化

在我们微服务中,服务与服务之间通过Rpc远程调用,假如ServiceA 里面调用了B然后调用C,调用B成功了,C失败了,但是服务B缺无法回滚了。或者ServiceA成功调用BC,但是准备提交本地事务的时候宕机了,这个时候B和C已经提交了。这个时候就会造成数据的不一致。

1.2.CAP原则和BASE理论

之前的文章已经介绍了很多,有兴趣可以看一下我之前的博客。

SpringCloud Alibaba(四) Nacos服务端本地启动和源码浅析(AP架构),Distro协议介绍,CAP原则介绍_Dr.劳的博客-CSDN博客_springcloud本地启动

2.分布式事务解决方案(刚性事务--强一致)

2.1 两阶段提交2PC

2.1.1 DTP(Distributed Transaction Processing)模型

如果一个系统操作多个数据库,肯定是有跨多个库的分布式事务的一个问题,在很多年之前全世界,老美早就已经发现这个问题了,很早以前就定义了一整套的解决方案来处理分布式事务的问题 有个叫做X/Open的组织定义了分布式事务的模型(DTP)。

这里面有几个角色,就是

AP(Application,应用程序)

TM(Transaction Manager,事务管理器),

RM(Resource Manager,资源管理器),

CRM(Communication Resource Manager,通信资源管理器)

然后这里定义了一个很重要的概念,就是全局事务,这个玩意儿说白了就是一个横跨多个数据库的事务,就是一个事务里,涉及了多个数据库的操作,然后要保证多个数据库中,任何一个操作失败了,其他所有库的操作全部回滚,这就是所谓的分布式事务。

2.1.2 XA规范

上面这套东西就是所谓的X/Open组织搞的一个分布式事务的模型,那么XA是啥呢?说白了,就是定义好的那个TM与RM之间的接口规范,就是管理分布式事务的那个组件跟各个数据库之间通信的一个接口。

比如管理分布式事务的组件,TM就会根据XA定义的接口规范,刷刷刷跟各个数据库通信和交互,告诉大家说,各位数据库同学一起来回滚一下,或者是一起来提交个事务把,大概这个意思

这个XA仅仅是个规范,具体的实现是数据库产商来提供的,比如说MySQL就会提供XA规范的接口函数和类库实现,等等

2.1.3 两阶段提交协议

X/Open组织定义的一套分布式事务的模型,还是比较虚的,还没办法落地,而且XA接口规范也是一个比较务虚的一个东西,光靠我说的这些东西还是没法落地的 基本上来说,你搞明白了XA也就明白了2PC了,2PC说白了就是基于XA规范搞的一套分布式事务的理论,也可以叫做一套规范,或者是协议。

Two-Phase-Commitment-Protocol,两阶段提交协议 2PC,其实就是基于XA规范,来让分布式事务可以落地,定义了很多实现分布式事务过程中的一些细节。

2.1.4 生活案例

阶段 1
1 A 发邮件给 B C D ,提出下周三去爬山, 问是否同意 。那么此时 A 需要 等待 B C D 的邮件。
2 B C D 分别查看自己的日程安排表。 B C 发现自己在当日没有活动安排,则发邮件告诉 A 它们同意下周三去爬长城。由于某种原因, D 白天没有查看邮 件。那么此时 A B C 均需要等待。到晚上的时候, D 发现了 A 的邮件,然后查看日程安排,发现周三当天已经有别的安排,那么 D 回复 A 说活动取消吧。
阶段 2
1 A 收到了所有活动参与者 的邮件,并且 A 发现 D 下周三不能去爬山。那么 A 将发邮件通知 B C D ,下周三爬长城活动取消。
2 B C 回复 A “太可惜了”, D 回复 A “不好意思” , 至此该事务终止
我们可以发现A即为我们的TM(事务管理器)决策者,B,C,D分别是我们的AP(应用程序),AP他们自己连着RM(数据库),而TM用于通知RM准备,提交,或者回滚。
事务管理器先问问各个数据库你准备好了吗?如果每个数据库都回复ok,那么就正式提交事务,在各个数据库上执行操作;如果任何一个数据库回答不ok,那么就回滚事务。

??

2.1.5 二阶段提交代码落地

我们常用的Mysql,Oracle,PgSql都实现了XA协议,所以可以直接使用他们对于XA协议的支持来实现分布式事务。
流程(这里的TM其实和我们的应用程序是在嵌入一起的,数据库的包已经实现了TM的功能):
  • 1.生成user库和account库的连接对象
  • 2.生成全局事务id和分支事务id(后面会讲解这两个Id的作用)并且绑定,user库一个分支事务Id,account库一个分支数据id。
  • 3.TM向user库(RM)发起请求依次执行xa.start,需要执行的sql,xa.end。
  • 4.TM向account库(RM)发起请求依次执行xa.start,需要执行的sql,xa.end。
  • 5.TM询问user库和account库准备好了吗?
  • 6.两个RM都回答Ok,提交事务
  • 7.如果有一个异常或者回复不Ok,回滚事务。
这里的指令都是数据库对于XA协议的实现,只需要遵守就好了。
xa.start 告诉数据库我要开始了
xa.end 告诉数据库我的sql已经执行完了,但是没提交、
xa.prepare 询问数据库你们只能把好了吗
xa.commit 提交
xa.rollback 回滚
xa.recover(查询事务状态的指令是否在prepare状态)
    public static void main(String[] args) throws SQLException {
        //true表示打印XA语句,,用于调试
        boolean logXaCommands = true;
        // 获得资源管理器操作接口实例 RM1
        Connection conn1 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_user", "root", "root");
        XAConnection xaConn1 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn1, logXaCommands);
        XAResource rm1 = xaConn1.getXAResource();
        // 获得资源管理器操作接口实例 RM2
        Connection conn2 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_account", "root", "root");
        XAConnection xaConn2 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn2, logXaCommands);
        XAResource rm2 = xaConn2.getXAResource();
        // AP请求TM执行一个分布式事务,TM生成全局事务id
        byte[] gtrid = "g12345".getBytes();
        int formatId = 1;
        try {
            // ==============分别执行RM1和RM2上的事务分支====================
            // TM生成rm1上的事务分支id
            byte[] bqual1 = "b00001".getBytes();
            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
            // 执行rm1上的事务分支
            rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
            PreparedStatement ps1 = conn1.prepareStatement(
                    "INSERT into user(name) VALUES ('test')");
            ps1.execute();
            rm1.end(xid1, XAResource.TMSUCCESS);
            // TM生成rm2上的事务分支id
            byte[] bqual2 = "b00002".getBytes();
            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
            // 执行rm2上的事务分支
            rm2.start(xid2, XAResource.TMNOFLAGS);
            PreparedStatement ps2 = conn2.prepareStatement(
                    "INSERT into account(user_id,money) VALUES (1,10000000)");
            ps2.execute();
            rm2.end(xid2, XAResource.TMSUCCESS);

            // ===================两阶段提交================================
            // phase1:询问所有的RM 准备提交事务分支
            int rm1_prepare = rm1.prepare(xid1);
            int rm2_prepare = rm2.prepare(xid2);

            // phase2:提交所有事务分支
            boolean onePhase = false;
            //TM判断有2个事务分支,所以不能优化为一阶段提交
            if (rm1_prepare == XAResource.XA_OK
                    && rm2_prepare == XAResource.XA_OK) {
                //所有事务分支都prepare成功,提交所有事务分支
                rm1.commit(xid1, onePhase);
                rm2.commit(xid2, onePhase);
            } else {
                //如果有事务分支没有成功,则回滚
                rm1.rollback(xid1);
                rm2.rollback(xid2);
            }
        } catch (XAException e) {
            // 如果出现异常,也要进行回滚
            e.printStackTrace();
        }
    }

执行结果:后面的byte为全局事务和分支事务的id

2.1.6 两阶段提交原理(Seata的AT模式也是这一种模型)

1准备阶段Prepare phase):事务管理器给每个参与者发送prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo,此时事务没有提交。

Undo日志是记录修改前的数据,用户数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据。)

2提交阶段Commit phase):如果事务管理器接收了参与者执行失败或者超时消息时,直接给每个参与者发送回滚消息,否则发送提交消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。

2.1.7 两阶段提交的缺陷

1.锁资源(效率低):二阶段提交协议的第一阶段准备阶段不仅仅是回答YES or NO,还是要执行事务操作的,只是执行完事务操作,并没有进行commit或者rollback。也就是说,一旦事务执行之后,在没有执行commit或者rollback之前,资源是被锁定的。这会造成阻塞,如果sql是行锁则锁行,表锁则锁表。

2.局限性:如果数据库没有自己对XA的实现你是无法使用的,什么redis那些都不能使用。

3.单点故障:由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。

4.数据不一致:在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这会导致只有一部分参与者接受到了commit请求,而在这部分参与者接到commit请求之后就会执行commit操作,但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

2.1.8 两阶段提交协议事务悬挂与解决方案

如果我们执行过程中的时候,宕机了怎么办,或者commit,rollback的时候网络故障了,整个事务流程没有走完,相信很多同学都会有这个想法,这个时候就会锁资源,事务还悬挂在哪,咋办呢?

解决方案:全局事务id和分支事务id这个时候的作用就来了,我们一定要记录下每一个分支事务走到哪一个流程的日志,事务状态是怎么样的,是否已经完成,时间是什么时候。然后定时任务补偿,调用上面提到的XA.RECOVER,他可以查询到所有正在Prepare(悬挂)事务的全局id和分支事务id,根据这些id信息,查询日志悬挂了几分钟的事务,然后根据所有分支事务的状态,我们就能决定执行提交还是回滚。例如一些开源分布式事务框架Atomikos,hmily,已经帮我们实现。

当然,如果最终还是不能提交事务,得人工去处理。

2.1.9 两阶段提交总结

两阶段,我们很少用,一般来说某个系统内部如果出现跨多个库的这么一个操作,是不合规的,而且性能不太行。现在微服务,一个大的系统分成几十个服务。一般来说,我们的规定和规范,是要求说每个服务只能操作自己对应的一个数据库。 如果你要操作别的服务对应的库,不允许直连别的服务的库,违反微服务架构的规范,你随便交叉胡乱访问,这样的一套服务是没法管理的,没法治理的,经常数据被别人改错,自己的库被别人写挂。 如果你要操作别人的服务的库,你必须是通过调用别的服务的接口来实现,绝对不允许你交叉访问别人的数据库。

市面上还有一些2PC的实现框架,例如Atomikos,其实底层都是通过动态代理数据源,拦截sql执行,然后执行xa.start等方法。并且他们很好的帮我们去解决两阶段提交事务悬挂等问题,不需要自己实现。

2.2.三阶段提交 3PC(three-phase-commitment)

2.2.1什么是3PC

三阶段提交(3PC),是二阶段提交(2PC)的改进版本。

与两阶段提交不同的是,三阶段提交有两个改动点:

? ? 1、引入超时机制。同时在协调者和参与者中都引入超时机制。

? ? 2、在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。也就是说,除了引入超时机制之外,3PC把2PC的准备阶段再次一分为二,这样三阶段提交就有CanCommit、PreCommit、DoCommit三个阶段。

2.2.2 3PC的三个阶段

  • CanCommit阶段

?? ?3PC的CanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。

?? ?1.事务询问 协调者向参与者发送CanCommit请求。询问是否可以执行事务提交操作。然后开始等待参与者的响应。

?? ?2.响应反馈 参与者接到CanCommit请求之后,正常情况下,如果其自身认为可以顺利执行事务,则返回Yes响应,并进入预备状态。否则反馈No

  • PreCommit阶段

?? ?协调者根据参与者的反应情况来决定是否可以记性事务的PreCommit操作。根据响应情况,有以下两种可能。

? ? 假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务的预执行。

?? ?1.发送预提交请求 协调者向参与者发送PreCommit请求,并进入Prepared阶段。?? ?

?? ?2.事务预提交 参与者接收到PreCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中。

?? ?3.响应反馈 如果参与者成功的执行了事务操作,则返回ACK响应,同时开始等待最终指令。

? ?假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。

?? ?1.发送中断请求 协调者向所有参与者发送abort请求。

?? ?2.中断事务 参与者收到来自协调者的abort请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断。

  • doCommit阶段

? ? 该阶段进行真正的事务提交,也可以分为以下两种情况。

? ??Case 1:执行提交

  • ?? ?1.发送提交请求 协调接收到参与者发送的ACK响应,那么他将从预提交状态进入到提交状态。并向所有参与者发送doCommit请求。
  • ?? ?2.事务提交 参与者接收到doCommit请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。
  • ?? ?3.响应反馈 事务提交完之后,向协调者发送Ack响应。
  • ?? ?4.完成事务 协调者接收到所有参与者的ack响应之后,完成事务。

? ?Case 2:中断事务?协调者没有接收到参与者发送的ACK响应(可能是接受者发送的不是ACK响应,也可能响应超时),那么就会执行中断事务。

  • ?? ?1.发送中断请求 协调者向所有参与者发送abort请求
  • ?? ?2.事务回滚 参与者接收到abort请求之后,利用其在阶段二记录的undo信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
  • ?? ?3.反馈结果 参与者完成事务回滚之后,向协调者发送ACK消息
  • ?? ?4.中断事务 协调者接收到参与者反馈的ACK消息之后,执行事务的中断。?

????在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者abort请求时,会在等待超时之后,会继续进行事务的提交。(其实这个应该是基于概率来决定的,当进入第三阶段时,说明参与者在第二阶段已经收到了PreCommit请求,那么协调者产生PreCommit请求的前提条件是他在第二阶段开始之前,收到所有参与者的CanCommit响应都是Yes。(一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了)所以,一句话概括就是,当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大。 )

2.2.3 3PC的缺点

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

2.2.4 2PC,3PC总结

2pc和3pc都是属于刚性事务,性能都会有影响,因为prapare阶段会锁资源,并且我们发现,这种模式大多数使用于单个项目多数据源,并不适合我们分布式的环境远程RPC调用。2PC只有TM的超时机制,3PC新增了参与者(RM)的超时机制,3PC多了CanCommit阶段,这就是最大的区别。

3.分布式事务解决方案(柔性事务--最终一致)

3.1 tcc分布式事务

TCCTry,Confirm,Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try,确认阶段Confirm,撤销Cancel

Try操作做业务检查资源预留confirm业务确认操作Cancel实现一个与Try相反的操作即回滚操作。其实就是一个补偿。

TM(事务管理器,这里只是一个概念)首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的cancel操作;若try成功,TM将会发起所有分支事务的confirm操作,其中confirm/cancel操作若执行失败,TM会进行重试。

采用TCC则认为Confirm阶段是不会出错的。即:只要Try成功,Confirm一定会成功。若Confirm阶段真的出错了,需要引入重试机制或人工处理。

Cancel阶段是在业务执行错误需要回滚的状态下执行分支事务的取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定能成功的。若Cancel阶段真的错误了,需要引入重试机制或人工处理。

Try操作可以为空,即什么也不做。例如A转20块给B(两个用户不同库),Try的时候A扣20,B可以什么都不做,comfirm的时候才给B增加20块。

3.1.1 拿用户余额支付下单为例演示tcc

1.扣商品库存

2.生成订单

3.赠送积分

4.扣除余额

在微服务下,这4个操作是对应不同的微服务,每个微服务连接不同的库,所以会产生分布式事务的问题,我们来看一下在TCC下面是如何处理的。

1.try操作

库存服务修改对应商品(原库存100)库存为99,冻结库存为1

订单服务新增记录订单A,状态为待支付

积分服务新增记录积分记录,状态为待生成

用户服务修改用户余额(原余额100)为99,冻结余额为1

2.comfirm操作

当所有try都成功之后执行comfirm

库存服务修改对应商品库存为99,冻结库存为0

订单服务新增记录订单A,状态为已支付

积分服务新增记录积分记录,状态为未使用

用户服务修改用户余额为99,冻结余额为0

3.cancel操作

当有一个try操作有异常执行cancel

库存服务修改对应商品库存为100,冻结库存为0

订单服务修改记录订单A,状态为已取消

积分服务新增记录积分记录,状态为已取消

用户服务修改用户余额为100,冻结余额为0

3.1.2什么是预留资源(try)?

在try操作的时候通常我们会新增多一个字段来记录预留的资源,记录修改前后的一些状态等。

例如执行try操作

新增:

新增一条记录我们会有一个状态字段,我们会插入一条数据然后状态改为待新增,就为预留资源

删除:

删除记录的时候我们把状态改为待删除,状态就为预留资源。

修改:

修改的时候这个字段为将要修改的状态,例如A给B(都有100块)转20块,首先A的余额变为80,冻结余额为-20,B的余额为100,冻结余额为20,A和B这冻结的字段就为预留资源。这里为什么不能直接给B转20,而是加一个冻结字段,因为你直接给B加,B用了120块,那就哦豁,凉了。所以有时候我们需要预留一个字段去记录一下修改前后的一些数据。

3.1.3 TCC代码演示

这里我用的开源框架是byteTcc作为例子,我们只需要看一下如何将一个接口拆分为try,comfirm,cancel,即使你用seata也是一样的流程,框架会自动帮你调用每个微服务对应的try,comfirm,cancel,无需我们自己手动去操作。并且他们会帮你记录事务日志,如果没用框架,哪一个微服务执行了try,confirm,cancel都需要保存在我们事务日志表,保存日志和业务操作在同一个事务提交

代码逻辑:

我们模拟UserClient2转账给UserClient1,调用的UserClient1#transfer,两个client都标注了@Compensable,证明这是一个需要开启分布式事务的方法,这个注解里面有一个接口,这个接口里面的方法就对应Try,例如client1是transfer ,client2是decreaseAmount? ?。

UserClient1:

try:#transfer? ?假设B转账给A的10元,用户A的冻结资金字段+10

confirm:#confirmTransfer? 用户A的真实资金字段+10,冻结资金-10,就是恢复0,表示这次转账完成

cancel:#cancelTransfer? 用户A的冻结资金字段-10

UserClient2:(这里我们冻结字段-10也可以,不必纠结)

try:#decreaseAmount? ?用户B自己的冻结资金字段加上+10,并且真实资金-10

confirm:#decreaseAmountConfirm? 用户B冻结资金 -10,就是恢复0,表示这次转账完成

cancel:#decreaseAmountCancel 用户B的冻结资金 -10,真实资金+10

首先UserClient1#transfer,这个方法里面会通过Feign去扣除UserClient2的钱,并且UserClient2的扣钱方法也是Try方法,然后Client1的本地事务为自己加钱的Try方法,事务提交,框架会帮我们自动调用client1和client2的comfirm方法,如果有异常框架会自动帮我们调用他们的cancel方法。记住切勿自己把异常捕获

从这里看出,TCC分布式事务是我们自己手动写冻结资源,提交,补偿接口,会将一个接口拆分为3个,加大了系统复杂度,但是不会锁资源,并且可控度很高。

? UserClient1:

public interface ITransferService {

	public void transfer(String sourceAcctId, String targetAcctId, double amount);

}
----------------------------------------------------------------------------------------
@Compensable(interfaceClass = ITransferService.class, simplified = true)
@RestController
public class SimplifiedController implements ITransferService {
	
	@Autowired
	private TransferDao transferDao;
	@Autowired
	private IAccountService acctServiceFeign;
	
 /**
*ITransferService 实现了transfer这个方法,默认byteTcc会认为他是Try方法
*/
	@ResponseBody
	@RequestMapping(value = "/simplified/transfer", method = RequestMethod.POST)
	@Transactional
	public void transfer(@RequestParam String sourceAcctId, @RequestParam String targetAcctId, @RequestParam double amount) {
		this.acctServiceFeign.decreaseAmount(sourceAcctId, amount);
		this.increaseAmount(targetAcctId, amount);
	}

	private void increaseAmount(String acctId, double amount) {
		System.out.println("update tb_account_two set frozen = frozen + #{amount} where acct_id = #{acctId}");
		int value = this.transferDao.increaseAmount(acctId, amount);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("exec increase: acct= %s, amount= %7.2f%n", acctId, amount);
	}

	@CompensableConfirm
	@Transactional
	public void confirmTransfer(String sourceAcctId, String targetAcctId, double amount) {
		System.out.println("update tb_account_two set amount = amount + #{amount}, frozen = frozen - #{amount} where acct_id = #{acctId}");
		int value = this.transferDao.confirmIncrease(targetAcctId, amount);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("done increase: acct= %s, amount= %7.2f%n", targetAcctId, amount);
	}

	@CompensableCancel
	@Transactional
	public void cancelTransfer(String sourceAcctId, String targetAcctId, double amount) {
		System.out.println("update tb_account_two set frozen = frozen - #{amount} where acct_id = #{acctId}");
		int value = this.transferDao.cancelIncrease(targetAcctId, amount);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("exec decrease: acct= %s, amount= %7.2f%n", targetAcctId, amount);
	}

}

UserClient2:

public interface IAccountService {
	public void decreaseAmount(String accountId, double amount);
}
-----------------------------------------------------------------------------------------
@Compensable(interfaceClass = IAccountService.class, confirmableKey = "accountServiceConfirm", cancellableKey = "accountServiceCancel")
@RestController
public class AccountController implements IAccountService {
	
	@Autowired
	private JdbcTemplate jdbcTemplate;
	 /**
*IAccountService 实现了decreaseAmount这个方法,默认byteTcc会认为他是Try方法
*/
	@ResponseBody
	@RequestMapping(value = "/decrease", method = RequestMethod.POST)
	@Transactional
	public void decreaseAmount(@RequestParam("acctId") String acctId, @RequestParam("amount") double amount) {
		int value = this.jdbcTemplate.update(
				"update tb_account_one set amount = amount - ?, frozen = frozen + ? where acct_id = ?", amount, amount, acctId);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("exec decrease: acct= %s, amount= %7.2f%n", acctId, amount);

		// throw new IllegalStateException("error");
	}

	@CompensableConfirm
	@Transactional
	public void decreaseAmountComfirm(String acctId, double amount) {
		int value = this.jdbcTemplate.update("update tb_account_one set frozen = frozen - ? where acct_id = ?", amount, acctId);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("done decrease: acct= %s, amount= %7.2f%n", acctId, amount);
	}

	@CompensableCancel
	@Transactional
	public void decreaseAmountCancel(String acctId, double amount) {
		int value = this.jdbcTemplate.update(
				"update tb_account_one set amount = amount + ?, frozen = frozen - ? where acct_id = ?", amount, amount, acctId);
		if (value != 1) {
			throw new IllegalStateException("ERROR!");
		}
		System.out.printf("undo decrease: acct= %s, amount= %7.2f%n", acctId, amount);
	}

	
}

3.1.4 TCC事务会产生的问题一:幂等性

幂等性问题会发生在我们confirm和cancel阶段,try也会但是很少,当我们所有服务调用try接口成功的时候,我们会调用对应服务的confirm或者cancel,这个时候由于网络原因超时了,会有一个retry的重试操作,网络超时代表我们并不知道confirm或者cancel的执行结,假如你进行重试,如果没保证幂等性就会产生数据的错误,所以我们必须要保证幂等性。

解决方案:其实tcc事务执行,会有一个贯穿整个全局事务的全局事务id并且每一个分支事务会有一个分支事务id,我们每个微服务本地需要有一张分支本地事务日志表,里面有的字段,全局事务id,分支事务id,分支事务执行状态(1.try执行成功,2.confirm执行成功,3cancel执行成功),这样我们在重试的时候,首先用分支事务id来作为锁的key,然后去查询本地事务表,我是否执行过这一步操作,如果执行过则不执行,这样就可以保证幂等性。所以我们业务操作每一步操作的时候都需要在本地事务表记录当前分支事务的状态,和业务代码一起提交事务,这样可以回溯分支事务是否完成

例子:

开始全局事务:生成全局事务id=123

分支事务=1,全局事务id=123.? ? 扣商品库存

分支事务=2,全局事务id=123.? ? 生成订单

分支事务=3,全局事务id=123.? ? 赠送积分

分支事务=4,全局事务id=123.? ? 扣除余额

结束全局事务:异步删除对应的事务id

3.1.5 TCC事务会产生的问题二:事务悬挂

还是网络原因产生的问题,假设我们设置了请求的超时时间为3秒,当我们对服务B执行try操作的时候,产生了超时,这个时候我们会调用B对应的cancel接口,就是这么骚,我们的try还没执行,然后执行了cancel,这个时候就有可能产生脏数据。这个时候服务B执行try的线程有空回来了,然后执行了try操作,那就完蛋了,我这个try操作永远都悬挂在这里了,芭比Q,因为我已经执行了cancel。

解决方案:全局事务id,分支事务id,分支本地事务日志表,在我们执行try操作之前,加锁,然后去本地事务表找一下当前有没有执行过cancel操作,有就不执行。

3.1.6?TCC事务会产生的问题三:空回滚

拿下面的下单流程为例,假设步骤1执行try成功了,然后步骤2失败了,这个时候框架会帮我们自动去调用步骤1,2,3,4的cancel方法,然而2,3,4的try方法都还没执行,证明都还没开始预留资源,你他么就把我回滚了,我try还没扣余额,你cancel反倒还给我加余额,赢两次。这就是空回滚问题。

解决方案:全局事务id,分支事务id,分支本地事务日志表,在我们执行cancel操作之前,加锁,然后去本地事务表找一下当前有没有执行过try操作,有就执行,没有就不执行,并且记录本地事务日志表状态。

1.扣商品库存

2.生成订单

3.赠送积分

4.扣除余额

3.1.7?TCC事务会产生的问题三:cancel或者confirm失败

有很多好奇的小朋友就要问,假设我所有try都成功了,或者有一些失败了,需要cancel和confirm的时候因为网络问题失败了,肿么办。不要慌,本地会有个定时任务,定时去本地事务表日志扫描还未完成的事务,假设这个事务所有try都成功,有一部分confirm失败了,定时任务会不断去帮你执行confirm操作,反之cancel。这个时候仍然需要依赖我们全局事务id,分支事务id,本地事务日志表。

如果最后多次重试都失败必须人工补偿,要预警。

3.1.8?TCC事务会产生的问题四:宕机恢复

有很多好奇的小朋友就要问,假设我执行事务的中途宕机了怎么办,不要慌。项目启动的时候去去本地事务日志表扫描还没执行完的事务,然后去询问是不是所有分支事务的try都执行完了啊,如果是,定时任务会不断去帮你执行confirm操作,反之cancel。这个时候仍然需要依赖我们全局事务id,分支事务id,本地事务日志表。

如果最后多次重试都失败必须人工补偿,要预警。

3.1.9 TCC等框架简单原理

本地事务日志表,全局事务id,分支事务id对于TCC分布式事务非常重要,但是一般框架都帮我们给实现了这个功能,包括幂等性,空回滚,失败重试,宕机启动,包括整个try之后confirm或者cancel的自动调用,假如我们自己实现就需要解决这种问题。

一般这种框架原理是动态代理数据源,重写了commit等方法,在分布式事务开启的时候生成全局事务id,通过feign调用的时候,通过feign的拦截器,把全局事务id在请求头带过去,假如调用的服务也是在分布式事务中的话,就生成分支事务id,然后在commit本地事务的时候把当前分支事务的状态也记录下来了,因为在一个库所以能保证原子性。每个微服务本地事务commit之后try阶段结束,框架会自动帮我们调用和我们服务调用链相关联的confirm或者cancel。

3.2.0?TCC分布式事务总结

优点:并发高,不锁资源(2PC会锁数据库资源),本地事务提交即可,一般适用于资金等不能出错的场景。

缺点:复杂,需要一个业务接口需要拆成3个,业务侵入性非常之大,老系统改得你怀疑人生,如果自己实现还得记录本地事务日志,重试,一般团队没实力不会使用这个,因为太骚了。

3.2 本地消息表

  1. A系统在自己本地一个事务里操作同时,插入一条数据到消息表(同一个数据库原子性)
  2. 接着A系统将这个消息发送到MQ中去
  3. B系统接收到消息之后,在一个事务里,往自己本地消息表里(幂等性)插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息
  4. B系统执行成功之后,就会更新自己本地消息表的状态以及A系统消息表的状态
  5. 如果B系统处理失败了,那么就不会更新消息表状态,那么此时A系统会定时扫描自己的消息表,如果有没处理的消息,会再次发送到MQ中去,让B再次处理
  6. 这个方案保证了最终一致性,哪怕B事务失败了,但是A会不断重发消息,直到B那边成功为止,达到最终一致
  7. 我们发送出去的消息,必须得带上业务相关联的数据,例如新增用户,发送增加积分mq消息,我们的mq消息的消息体得带上新增这个用户的id,以便以后回溯查找问题。
  8. 简单理解:消息本地化,定时任务定时发送消息中间件

3.2.1 本地消息表事例

已用户注册赠送积分为例

1.用户服务插入用户数据

2.在同一个事务中,插入一条赠送积分的数据到本地消息表,状态为未发送。

3.事务提交

4.定时任务定时去本地消息表,找状态是未发送的数据,发送mq,然后状态改为已发送。

5.mq持久化消息。

6.积分服务监听对应的topic,消费消息的时候一定要注意幂等性,因为步骤4有可能重复发送消息,有可能网络问题,导致消息重复发送出去,mq在没有ack的情况下也会一直重复发送,我们的定时任务没有把消息的状态修改,然后定时任务会重复发送。我们要做的是消费消息的时候先用消息id插入本地消息表,保证幂等性,然后再去处理我们的业务。幂等性很多方法都可以保证,不一定需要记录消费的消息。例如消费的时候加上分布式锁,锁里面去查询业务是否完成,如果完成了下一次消息过来,就不消费。消费完消息记得ack,不然mq也会重复投递。

7.(可选).积分服务消费完,告诉用户服务的本地消息表,把对应的消息改为已完成。

8.(可选)定时任务定时去本地消息表,找状态是已发送状态的数据,重复投递。

9.(兜底)如果一个消息多次投递失败,或者多次消费失败就不再投递或者不再重复消费,这个时候我们就得做预警,人工处理。

10.(改进)我们可以本地事务提交之后立即发送mq,然后修改本地消息的状态,这样实时性会好一点,其他还是不变,定时任务用于兜底和重发。

11.(改进)我们可以新增多一个查询接口,让消费端可以来查询我们业务的状态。

3.2.2 本地消息表实现代码

1.UserClient新增用户,并且保存本地消息增加积分表,本地积分表的数据必须带上关联业务的id等,这里我们带上了用户id,因为后续我们出异常了有可能去用户服务查询这个用户是否真的注册了。

2.定时任务投递状态为待发送的消息给mq,并且修改状态为已发送

3.PointClient接受处理消息,先插入消息表,标识已经处理过了,并且这个表有唯一约束,然后,处理业务消息。

4.记得ack,不然mq会重复投递。

5.(可选)业务处理完我们也可以告诉调用方处理完,消息状态改为已完成。

UserClient 用户服务

@Component
public class UserClient {
    @Autowired
    private UserMapper userMapper;
    @Autowired
    private PointLogMapper pointLogMapper;
    //本地事务,可以优化本地事务提交之后立刻发送mq,实时性会好一点
    @Transactional
    public ApiResult save(User user){
        //保存用户
        userMapper.save(user);
        PointLog pointLog=new PointLog();
        pointLog.setUserId(user.getid);
        pointLog.setPoint(1);
        pointLog.setcreateTimeUtc(TimeUtils.getUTCTime());
        pointLog.setstate(0);
        //保存积分日志记录
        pointLogMapper.savePointLog(pointLog);
    }
     
    //本地消息表找未发送mq的消息,发送mq
    @Scheduled(cron = "0 1 0 * * ?")
    public pushMessage(){
        List<PointLog> pointLogList=pointLogMapper.findAllLogUnPush();
        for (PointLog pointLog:
        pointLogList) {
            boolean success=sendMq(JSON.toJsonString(pointLog));
            if(success){
                pointLog.setstate(1);
                pointLogMapper.update(pointLog);
            }
        }
    }
}

PointClient积分服务?

@Component
public class PointClient {
    @Autowired
    private PonitMessageMapper pointMessageMapper;

    @Override
    @Transactional
    public void onMessage(String message){
        PointMessage pointMessage=JSONObject.parseObject(jsonObject.toJSONString(), new TypeReference<PointMessage>(){});
        //插入消息表,唯一约束
        pointMessageMapper.insert(pointMessage);
        //处理逻辑
        pointServeice.addPoint(pointMessage);
        
        message.ack();
    }
}

3.2.3 本地消息表总结

优点:

简单,开发成本低,简单

缺点:

与业务场景绑定,高耦合,不可公用

本地消息表与业务数据表在同一个库,占用业务系统资源,量大可能会影响数据库性能

需要加入MQ中间件,其实不需要也可以,我们自己去调用HTTP这样也可以。

3.3 可靠消息最终一致性方案

这个方案其实就是我们本地消息表的一个变种,实现原理基本也一样。

我们需要一个可靠消息服务,来帮我们把本地消息表还有定时任务的功能给做了,事务开始我们只需要投递一个半消息(消费端不可消费的消息),可靠消息服务把消息持久化(可以看做本地消息表),然后告诉可靠消息服务,我这次业务本地事务是提交还是回滚,如果是提交,我就让这个半消息的状态改变,变为可消费,并且消费端如果消费失败我们可靠消息服务的定时任务会不断投递可消费的消息给消费端,消费端ack。假如可靠消息服务迟迟收不到我是应该把这条半消息提交或者回滚,他会自动去调用发送端的查询接口,让他告诉我应该把这个半消息状态改为可消费,还是应该删除。

例如新增用户,增加积分

1.我用户服务先投递一个增加积分(要带上新增用户的id)的半消息给我们的可靠消息服务,投失败了本地事务不执行,投递成功了积分服务消费端也看不见这条消息。

2.执行本地事务新增用户。

3.本地事务成功,告诉可靠消息服务,你可以把之前的半消息变为可消费了,这样积分服务就能消费这条消息,注意幂等性。

4.本地事务失败,告诉可靠消息服务,你帮我把刚刚的半消息删除吧,我失败了。

5.假设因为网络原因迟迟没有告诉可靠消息服务,可靠消息服务会带着那条消息,来问你的用户服务,我这个用户是否生成了,如果生成了你就回复我用户生成了,我把消息的状态改为可消费,否则删除这条消息。

6.消费端消费和本地消息表一模一样,要告诉可靠消息我消费完了这条消息,你不要再发给我了,也要保住幂等性。

而这个可靠消息服,就是Rocketmq的事务消息功能,他完成了我们上述的所有功能。

3.3.1 rocketmq事务消息

RocketMQ的设计中broker与producter端的双向通信能力,使得broker天生可以作为一个事务协调器存在,而RocketMQ本身提供的存储机制为事务消息提供了持久化能力,RocketMQ的高性能机制以及可靠消息机制则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。 ? ? 在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的要给封装,将本地消息移动到了MQ内部,解决Producter端的消息发送与本地事务执行的原子性问题。

执行流程如下:

为方便理解我们还以注册送积分的例子来描述 整个流程。

Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

1、Producer 发送事务消息

Producer (MQ发送方)发送事务消息至MQ ServerMQ Server将消息状态标记为Prepared(预备状态),注:意此时这条消息消费者(MQ订阅方)是无法消费到的。

本例中,Producer 发送?”增加积分消息MQ Server

2、MQ Server回应消息发送成功

MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

本例中,Producer 执行添加用户操作。

4、消息投递

若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将增加积分消息状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除增加积分消息

MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack

5、事务回查

如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他?Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

3.3.2 rocketmq事务消息代码落地

同样以新增用户,增加积分为例子。

用户服务新增用户,而这里的积分服务消费消息,注意保证幂等性。

注意我们有一张事务表,来保存对应的事务id是否已经完成,用于回查,所以我们需要一个事务id贯穿整个事务,下面用的txNo。

AccountClient 用户服务

1.accountInfoController#accountCreateMessage

我们使用UUID生成了一个TxNo作为事务id,用户回查事务,然后调用service的发送新增用户消息。

@RestController
@Slf4j
public class AccountInfoController {
    @Autowired
    private AccountInfoService accountInfoService;

    @PostMapping(value = "/create")
    public String createAccount(@RequestParam("accountNo")String accountNo){
        //创建一个事务id,作为消息内容发到mq
        String tx_no = UUID.randomUUID().toString();
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,tx_no);
        //发送消息
        accountInfoService.createAccountMessage(accountChangeEvent);
        return "创建用户";
    }
}

2.accountInfoService#createAccountMessage 这个方法里面发送了rocketmq事务消息到producer_group_txmsg,这个时候是一个半消息,消费端看不见。

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    RocketMQTemplate rocketMQTemplate;


    //向mq发送新增用户消息
    @Override
    public void createAccountMessage(AccountChangeEvent accountChangeEvent) {

        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("createAccount",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination topic,
         * Message<?> message, 消息内容
         * Object arg 参数
         */
        rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg","topic_txmsg",message,null);

    }

    //创建用户
    @Override
    @Transactional
    public void createAccount(AccountChangeEvent accountChangeEvent) {
        //幂等判断
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //创建用户
        accountInfoDao.createAccount(accountChangeEvent.getAccountNo());
        //添加事务日志
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
    }
}

3.执行本地事务

当我们发送了半消息之后,会回调ProducerTxmsgListener#executeLocalTransaction,因为我做了监听@RocketMQTransactionListener

这个时候我们可以拿到发送出去的message,执行上面本地事务accountInfoService#createAccount,保存用户数据,并且保存事务日志(插入txNo(事务id))。


4.事务回查

ProducerTxmsgListener#checkLocalTransaction 查询本地事务是否完成

由于我们Message里面有一个txNo事务id,当我们执行ProducerTxmsgListener#executeLocalTransaction的时候我们会告诉rocketmq,这次本地事务是成功还是失败,如果成功了会插入事务日志表,失败就没有这条数据。

由于网络不稳定,我们告诉rocketmq的消息他没收到,rocketmq就会带着消息体定时调用我们的

ProducerTxmsgListener#checkLocalTransaction,我们从里面拿出txNo,查询当前事务是否执行完成,如果执行完成,就返回commit,消息可以对消费者可见。

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    AccountInfoService accountInfoService;

    @Autowired
    AccountInfoDao accountInfoDao;

    //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        try {
            //解析message,转成AccountChangeEvent
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,创建用户
            accountInfoService.createAccount(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }


    }

    //事务状态回查,查询是否执行成功
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

PointClient 积分服务

这里就不再写代码了,消费端和本地消息表的代码一模一样。注意好幂等性就好了,新增完积分然后给mq返回ack。

注意如果消费一直失败,需要预警 人工处理。

3.3.3 可靠消息最终一致总结

好处:

RocketMQ主要解决了两个功能:

1.本地事务与消息发送的原子性问题。

2.事务参与方接收消息的可靠性。 可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

3。性能比本地消息表好,能承受并发更高

坏处:

如果用rocketmq事务消息,强依赖了rocketmq;我们自己也可以按照rocketmq的这种自己实现一个可靠消息服务,但是增加了研发成本和系统复杂度。

3.4?最大努力通知

我们平常在接入微信支付的时候,只要用户支付成功了,微信会已一定的频率回调我们,假如我们回复微信失败了,他会过一段时间再通知,最多通知15次。

15次之后,我们就只能主动去微信那边查询这笔订单是否成功。微信的这个服务就是最大努力通知

通过例子我们总结最大努力通知方案的目标:
目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。
具体包括:
1、有一定的消息重复通知机制。
因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。
如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
下面就是我们往微信充值的交互流程。

3.4.1 最大努力通知实现方案

由于这里的代码比较简单,就是简化版的可靠消息,这里就不写了,只列出实现方案。

本方案是利用MQ ack 机制由 MQ 向接收通知方发送通知,流程如下:
1. 发起通知方将通知发给 MQ 。使用普通消息机制将通知发给MQ
? 注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。
2. 接收通知方监听 ?MQ
3. 接收通知方接收消息,业务处理完成回应 ack
4. 接收通知方若没有回应 ack MQ 会重复通知。
MQ 会按照间隔 1min 5min 10min 30min 1h 2h 5h 10h 的方式,逐步拉大通知间隔? (如果 MQ 采用 rocketMq ,在 broker 中可进行配置),直到达到通知要求的时间窗口上限。
5. 接收通知方可通过消息校对接口来校对消息的一致性。
6.接收方定时任务去查询一段时间未完成的业务,调用发送方的查询接口。
7.一般来说这种方案都是我们和第三方去交互才使用的,如果第三方调用我们,我们就去实现最大努力通知服务,不停回调第三方结果,直到最大次数。如果我们去调用微信,则微信需要实现最大努力通知。
8.内部调用也可以这样子,一般用于一些不太重要的通知。

?3.4.2 最大努力通知和可靠性消息的异同

1.解决方案思想不同

--可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。

-- 最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2. 两者的业务应用场景不同
? 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
? 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3. 技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

3.5 业务性分布式事务(日志加延时消息)

有一些业务,我们可以利用他的特殊性,去避免分布式事务(最终一致),有点类似TCC的思想,也是二阶段提交+事后补偿机制。
场景一:下单扣库存,发送优惠券,创建订单( 特殊性。库存可以少卖但是不可以超卖,我可以多扣,然后定时任务把多扣的补偿回去
步骤一:预订单(支付时间30秒)
---orderClient(订单服务)逻辑:远端优先扣库存,然后发送积分(优惠券也可以),创建订单生成OrderNo,作为贯穿我们整个业务的唯一标准。假设我们生成的orderNo=123;
---storeClient(库存服务)逻辑:
1.update product_store set?store=store-1 and lock_store=lock_store+1 where product_id=1 and store-1>0;?
2.insert sotre_log set lock_store=1,orderNo=123,state=UNFINISHED.
锁库存,库存表库存-1,冻结库存字段+1,日志记录orderNo=123这张订单锁了一个库存,状态为未完成。
3.发送延时消息,30分钟后去查询订单是否完成
如果没完成就本地事务释放库存,日志记录修改为失败,如果已完成,就在本地事务把冻结的库存释放,日志状态改为已完成
---pointClient(积分服务)逻辑:
1.insert point_log set point=1,orderNo=123,state=UNFINISHED.
积分记录表新增记录,表示订单123会增加1积分,但是状态是未完成。
2.发送延时消息,30分钟后去查询订单是否完成, 如果没完成就把日志记录改为失败,如果成功就在本地事务为用户加积分,并且积分日志记录表对应的记录状态改为已完成
---orderClient 生成订单,订单状态为预订单,返回给前端。
步骤二:用户支付之后,才去根据之前记录的日志去真正的扣库存,增加积分,优惠券等等,这些问服务去根据日志做真正的操作,是在本地事务操作的,所以不会有分布式事务的问题。
步骤三:我们还需要一个定时任务兜底,定期去这些日志表去查询未完成的日志,然后回查订单状态,订单成功则走对应流程,订单不存在也走对应的流程(上面红色字体)。
步骤四:如果用户取消订单,也是走的对应流程(上面红色字体)。
优点:避免了分布式事务
缺点:如果下单某个步骤异常失败了,我们只能等待延时任务去释放库存,会把库存锁定一段时间。
    @Transactional
    public Result createOrder(Product product){
        //生成订单编号
        String orderNo= RandomUtil.ranDomStr(5);
        DeductProduct decute=new DeductProduct();
        decute.setCount(product.getCount);
        decute.setProductId(product.getId);
        decute.setOrderNo(orderNo);
        //feign调用库存服务减库存
        boolean success=storeFeignClient.decute(decute);
        if(!success){
            return "error";
        }
        //feign给用户增加积分
        Point point=new Point();
        point.setPoint(10);
        point.setOrderNo(orderNo);
        //feign调用库存服务减库存
        boolean success1=pointFeignClient.addPOint(point);
        if(!success1){
            return "error";
        }
        //插入本地事务订单
        Order order=new Order();
        BeanUtils.copyProperties(product,order);
        order.setOrderNo(orderNo);
        OrderMapper.insert(order);
    }

场景二:我们做的共享充电宝下单也有这个特性(我可以把充电宝锁定,但是不能充电宝弹出但是没生成订单,他们两个是分开的微服务)

1.去查询现在有哪个充电宝可以弹出

2.假设槽位为1的充电宝可以弹出,先把他的状态改为待租借,这个状态的充电宝没有任何人可以再次租借,直到他状态被修改。这种状态的充电宝,无论你订单是否成功生成,我最后通过补偿都能改变这个状态,并且保证我的充电宝不会因为分布式事务导致订单没生成,充电宝弹出。

3.然后再生成预订单,即状态为预订单

4.订单事务提交之后,异步调用阿里云IOT的下发指令,弹出槽位1的充电宝

5.弹电结果返回弹出槽位1的充电宝,充电宝状态改为使用中,订单改为使用中,弹电返回失败,则改为可租借,订单改为失败。

6.假如没有反馈,有两种情况,

第一种:订单被回滚了,我们定时任务去查询待租借状态的充电宝,看看他有没有使用中的订单,如果有则改为租借中,如果无则改为可租借

第二种:因为网络问题机柜没上报,我们可以主动去查询槽位1的电池还在不在,也可以根据机柜上报的心跳,知道槽位1的充电宝是否还在,从而改变充电宝和订单的状态。

7.我订单服务和机柜服务这样就能依靠业务的特殊性,去规避分布式事务,并且事后能根据状态和日志补偿。待租借状态的充电宝就像锁资源,我不管你订单有没有生成,一直是待租借状态的充电宝是不可能是被弹出的。

3.5.1 业务性分布式事务总结

优点:可以规避分布式事务,然后通过定时任务补偿,达到最终一致
缺点:不是通用性方案,依赖业务的特殊性,比较类似TCC二阶段提交,可以做这种操作。

3.6 同步双写(多写)+异步对账

在我们的业务中,我们经常会对一些表进行双写,那么双写的过程中,因为是多个库,又会产生分布式事务的问题。
例如订单系统是分库分表的,订单通常会有两个查询维度,一个是买家,一个是买家,如果按买家分库,卖家查询就不好实现;如果按卖家分库,按买家的查询就不好实现。我们为了解决这种问题通常会把订单数据冗余一份,按买家分库分表存一份,卖家分库分表再存一份。那么一个订单要向两个库写入数据,如何保证原子性。

3.6.2 同步双写事务代码落地

没啥好写的,其实就是根据你的业务,去写定时任务,一个全量一个增量。

3.6.1?同步双写事务解决方案

对账! 对账有个关键点,要有对账的基准,我们需要知道用谁去对谁,通过对比两个数据可,发现不一致的数据,进行补偿。所以我们业务采取双写的时候:其中一个数据可成功了,另外的数据库或者缓存不管是否写成功,都给客户端返回成功,然后以该数据可为基准,校对另一个数据库或者缓存。
在实现层面,对账有以下几种实现方式
1.异步消息对账:
每一次业务采用双写操作,如果部分写成功,就抛出一条消息到消息中间件,然后由一个消费者消费这条消息,对两个数据可中的数据进行对比,用正确的以防补齐缺的一方,当然消息可能丢失,无法百分百的宝珠,需要下面的全量后台任务对账兜底。
2.全量后台任务对账:
例如,每天晚上有一个定时任务,全量对比前一天两个库的所有数据,去做补齐。
3.增量后台任务对账:
基于数据库的更新时间,每隔一段时间只对账增量数据。

3.6.2?同步双写(多写)+异步对账总结

优点:不需要去改动业务代码,只需要写好对账的定时任务即可。
缺点:频繁扫描数据库有可能造成压力过大。

3.7 阿里巴巴Seata框架

下一篇博客我会重点说AT模式,现在我们先了解一下,下一章会有代码落地。
我们读了上面的XA模式,我们知道他是一个二阶段提交的模式,并且在一阶段prepare的时候会在数据库层面写undo_log,redo_log Undo 日志是记录修改前的数据,用户数据库回滚, Redo 日志是记录修改后的数据,用于提交事务后写入数据。)
我们先说seata的AT模式,他的事务模型和XA模式基本一致,他也是一个二阶段提交的模式。
一阶段:
生成逆向sql,并且保存到对应的Undo_log表,我们每个微服务对应的数据库他都需要这张表。假设我们要往订单表insert数据,那么逆向sql就是delete,如果是update操作,逆向sql就是update之前的数据。
本地提交事务(业务和undo_log表一起在本地事务提交)。
二阶段:
如果所有业务成功,则删除undo_log表的记录,大家无事发生,因为我的本地事务在一阶段就提交了。
如果有一个业务失败,就根据undo_log表记录的数据,去反向补偿数据。
为了保证执行全局事务的时候保证数据不会被修改,需要加一个全局锁,其实就是把你要修改的库还有每一条记录的id收集起来。其他事务如果想修改这些数据必须看我有没有全局锁,有你就不能改。

?3.7.1 seataAT常见问题与XA对比

1.全局事务还没完成,数据被其他事务修改

假设一个请求基于 seata 对一条数据加了全局锁,并进行了本地分支事务提交。此时全局事务没有提交,另一个请求过来(这个请求就是简单的更新这条数据,不是分布式事务)自然就不需要获取全局锁,也就是可以直接更新。那全局事务再进行回滚时是不是就有问题了?

没错,描述的这个场景是有问题的,这也正是我们开发时需要考虑的地方,当有可能操作同一条数据引起并发问题的时候,给他加上全局事务,去获取全局锁。

2.既然seata也有全局锁,那么和我XA有什么区别,大家一阶段都是锁资源。

架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata的 RM 是以jar包的形式作为中间件层部署在应用程序这一侧的。 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成 才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。

简单来说,就是seataAT模式一阶段已经提交本地事务了,不会再持有数据库资源,但是别人想修改,必须先获取全局锁(所需要修改数据Id的集合)。而XA模式只要你事务一天不提交,我数据库就他么锁着这些数据,谁也不可以动,本人亲测,在XA执行的时候抛出异常,你一天没提交事务,那一行就这么被锁着,不能修改,如果修改的数据没有条件,就是表锁,是表锁就等着爆炸吧。所以seata at模式和我们2pc的XA很像,只不过seata是在一阶段先提交了数据,后续根据undo_log补偿。

3.什么是全局锁

假如我的用户13下单操作,需要锁商品id为1和id=5的库存,需要给用户新增积分,需要修改用户的余额,那么我们会把商品库存表id=1和id=5,用户表的id=13,全部收集起来,用tableName+id作为唯一索引,插入数据库,商品库存表锁了2个id就插入两条数据。(新增的数据不会有全局锁)

如果其他事物想去修改对应的数据,就必须先去获取全局锁,看看我这次修改的id在不在全局锁,如果在就在一段时间内循环获取全局锁。? ? ? ??

4.假如全局事务回滚,他会根据你的Undo_log日志去补偿之前的数据,假如是update操作他发现,undo_log存的数据和现在的数据不一致(所以需要全局锁),则会异常,需要人工补偿。

5.用了seata at模式,我们就不能随意update数据库的字段了,不然会死得很惨。

3.7.2 代码落地(下一篇博文会有详细介绍)

实际上seata AT模式是无侵入的,只需要加一个注解

1、全局事务开始使用 @GlobalTransactional标识 。

2、每个本地事务方案仍然使用@Transactional标识。

3、每个数据都需要创建undo_log表,此表是seata保证本地事务一致性的关键。

4、不管你本地事务还是分布式事务,只要你业务需要分布式事务加了@GlobalTransactional,那么本地事务也得加这个@GlobalTransactional,去获取全局锁,假如我的下单,需要锁库存,新增积分,我给这个流程加了@GlobalTransactional。那么假如用户退款,我们需要修改订单状态,这个操作不需要分布式事务,只需要本地事务,我们也得加@GlobalTransactional注解,他要去获取全局锁,没有全局锁才可以操作。

5、@GlobalTransactional这个注解有点重,他会http去注册全局事务和分支事务,再去获取全局锁,如果我们只有本地事务的存在的时候可以用@GlobalLock去获取全局锁,他不会去注册全局和分支事务,只会去获取全局锁。

3.7.3 seata AT模式总结

优点:业务无侵入,我们只需要加一个注解就可以了,原理都是代理数据源,增强commit等方法。

缺点:全局锁如果没有加好,其他事务在全局事务没执行完就去修改数据,那就等着原地爆炸吧。

比较适合一些数据只有少数人可以修改的场景,例如我们的后台服务,对应的人只能操作自己的数据。而且每一步关系到这些表的数据修改操作,都得去获取全局锁,造成性能问题。

3.8 saga事务

其实这个模式很少用,我们来看一下他的简单介绍。
TCC事务,其实就是将一个接口拆分为3个接口,try -> confirm -> cancel,try成功了就confirm,try失败了就cancel。那如果confirm或者cancel失败了呢?confirm/cancel失败了就是一直重试。

saga是将每个接口,拆分为2个接口,一个是业务接口,另外一个是补偿接口,相当于就是说将tcc里面的try和confirm合并为一个接口,就是先执行业务接口,直接就尝试完成整个业务逻辑的操作然后如果在服务调用链条里面,某个服务的业务接口执行失败了,那么直接对已经执行成功的所有服务都调用其补偿接口,将之前执行成功的业务逻辑给回滚。
saga这个东西,其实核心和本质,就是把每个操作分为实际的业务逻辑以及补偿业务逻辑,正常情况下,就依次执行各个服务的业务逻辑就好了,如果某个服务调用失败的话,直接对之前执行成功的那些服务都依次执行补偿逻辑。
saga事务的思想,就是将一个长的分布式事务,拆分为一连串的每个服务的本地事务,然后每个服务对每个接口提供两个接口,一个是业务接口,一个是回滚的补偿接口,正常情况下就是依次的进行调用。
异常情况下,就对之前已经执行成功的服务执行补偿接口,回滚业务逻辑。

4.分布式事务选型

刚性事务,强一致(2PC,3PC)一般不选,性能问题,但是是强一致,一般我们后台服务可以用一下Seata的XA模式。
柔性事务,最终一致,基于重试和补偿。

1.采用可靠消息一致性方案
可靠消息一致性要求只要消息发出,事务参与者接到消息就要将事务执行成功,不存在回滚的要求,一旦做了就要硬着头皮使用。适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注 册送积分,登录送优惠券等。

2.采用最大努力通知方案
最大努力通知表示发起通知方执行完本地事务后将结果通知给事务参与者,即使事务参与者执行业务处理失败发起通知方也不会回滚事务,一旦做了就要硬着头皮使用。典型的使用场景:银行通知、支付结果。通知等。

3.TCC
如果在非异常状态下,TCC其实是一个同步的过程,并且出问题了可以回滚,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。

4.seata AT
一些后台系统非常推荐,同一个数据不会有多个人修改的场景,业务无侵入。

5.同步双写,异步对账
这个不用说了,只要多个库双写,基本上都用这个方案。

6.终极方案,人工补偿
这个也不说了,乖乖去找日志写sql补偿吧。

如果是非常强一致,和资金类挂钩建议用TCC,TCC使用的细节太多了,如果不是非常强的团队很容易出问题,业务侵入性非常大,一个接口得改3.
日常用的比较多的是可靠消息一致性(本地消息表),加上对账,一般能满足我们的需求,最多就出问题根据人工补数据,本人已经深刻体会多次人工补数据的痛苦。

5.总结

分布式事务在业界中始终没有一个完美的解决方案,基本上都是靠重试+日志补偿,懂了这些之后我们发现这个问题,并没有想象中那么可怕,能不用分布式事务,就不用,靠重试+补偿(其实也是柔性事务)。
在条件允许的情况下,我们尽可能选择本地事务单数据源,因为它减少了网络交互带来的性能损耗,且避免了数据 弱一致性带来的种种问题。若某系统频繁且不合理的使用分布式事务,应首先从整体设计角度观察服务的拆分是否合理,是否高内聚低耦合?是否粒度太小?分布式事务一直是业界难题,因为网络的不确定性,而且我们习惯于拿分布式事务与单机事务ACID做对比。 无论是数据库层的XA、还是应用层TCC、可靠消息、最大努力通知等方案,都没有完美解决分布式事务问题,它们不过是各自在性能、一致性、可用性等方面做取舍,寻求某些场景偏好下的权衡。

参考万字长文详解Shardingsphere对XA分布式事务的支持 - 沧海5的个人空间 - OSCHINA - 中文开源技术交流社区

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-16 21:31:54  更:2022-06-16 21:33:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 19:18:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码