前言
当业务量级扩大之后的分库,以及微服务落地之后的业务服务化,都会产生分布式数据不一致的问题。既然本地事务无法满足需求,因此分布式事务就要登上舞台。
? 我们有必要先来了解下 CAP 原则和 BASE 理论。CAP 原则是 Consistency(一致性)、Availablity(可用性)和 Partition-tolerance(分区容错性)的缩写,它是分布式系统中的平衡理论。
- 一致性要求所有节点每次读操作都能保证获取到最新数据;
- 可用性要求无论任何故障产生后都能保证服务仍然可用;
- 分区容错性要求被分区的节点可以正常对外提供服务。
事实上,任何系统只可同时满足其中二个,无法三者兼顾,分区容错性是一个最基本的要求。所以系统解决方案。放弃一定可用性保证强一致性。采用最终一致性保证高可用。
业内比较常用的分布式事务解决方案:
- 强一致性的两阶段提交协议,三阶段提交协议,比如seata.
- 最终一致性的可靠事件模式、补偿模式,阿里的 TCC 模式。
简介
XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了**(全局)事务管理器?和(局 部)资源管理器(RM)**之间的接口。主流的关系型 数据库产品都是实现了XA接口的。
-
XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。 -
XA之所以需要引入事务管理器是因为,在分布 式系统中,从理论上讲两台机器理论上无法达 到一致的状态,需要引入一个单点进行协调。 -
由全局事务管理器管理和协调的事务,可以跨 越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。
组成部分
-
TC (Transaction Coordinator) 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。 -
RM资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。 -
TM事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识 Xid 接口 Xid, Xid 接口是 X/Open 事务标识符 XID 结构的 Java 映射。此接口指定三个访问器方法,以检索全局事务格式 ID、全局事务 ID 和分支限定符。Xid 接口供事务管理器和资源管理器使用。此接口对应用程序不可见。
- 总体流程
1.TM向TC注册全局事务 2.调用各资源管理器(即对各数据库数据进行操作), RM向TC注册分支事务, 此时sql会暂存,不会立即提交 3.TM向TC下达全局事务提交, 此时TC会依次执行sql 4.如果有分支事务失败了,则会对之前提交的sql进行回滚
二阶段提交
XA需要两阶段提交: prepare 和 commit.
分布式事务的两阶段提交是把整个事务提交分为 prepare 和 commit 两个阶段。以电商系统为例,分布式系统中有订单、账户和库存三个服务,如下图:
-
第一阶段,事务协调者向事务参与者发送 prepare 请求,事务参与者收到请求后,如果可以提交事务,回复 yes,否则回复 no。 -
第二阶段,如果所有事务参与者都回复了 yes,事务协调者向所有事务参与者发送 commit 请求,否则发送 rollback 请求。
存在的问题
- 本地事务在 prepare 阶段锁定资源,如果有其他事务也要修改 xiaoming 这个账户,就必须等待前面的事务完成。这样就造成了系统性能下降。
- 协调节点单点故障,如果第一个阶段 prepare 成功了,但是第二个阶段协调节点发出 commit 指令之前宕机了,所有服务的数据资源处于锁定状态,事务将无限期地等待。
- 数据不一致,如果第一阶段 prepare 成功了,但是第二阶段协调节点向某个节点发送 commit 命令时失败,就会导致数据不一致。
三阶段提交
为了解决两阶段提交的问题,三阶段提交做了改进:
- 在协调节点和事务参与者都引入了超时机制。
- 第一阶段的 prepare 阶段分成了两步,canCommi 和 preCommit。
如下图:
引入 preCommit 阶段后,协调节点会在 commit 之前再次检查各个事务参与者的状态,保证它们的状态是一致的。但是也存在问题,那就是如果第三阶段发出 rollback 请求,有的节点没有收到,那没有收到的节点会在超时之后进行提交,造成数据不一致。
MySQL对XA的支持
MySQL 从5.0.3开始支持XA分布式事务,且只有InnoDB存储引擎支持。MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持。
需要注意的是, 在DTP模型中,mysql属于资源管理器(RM)。而一个完整的分布式事务中,一般会存在多个RM,由事务管理器TM来统一进行协调。因此,这里所说的mysql对XA分布式事务的支持,一般指的是单台mysql实例如何执行自己的事务分支。
XA 事务SQL语法
https://dev.mysql.com/doc/refman/5.7/en/xa-statements.html
XA {START|BEGIN} xid [JOIN|RESUME] //开启XA事务,如果使用的是XA START而不是XA BEGIN,那么不支持[JOIN|RESUME],xid是一个唯一值,表示事务分支标识符
XA END xid [SUSPEND [FOR MIGRATE]] //结束一个XA事务,不支持[SUSPEND [FOR MIGRATE]]
XA PREPARE xid 准备提交
XA COMMIT xid [ONE PHASE] //提交,如果使用了ONE PHASE,则表示使用一阶段提交。两阶段提交协议中,如果只有一个RM参与,那么可以优化为一阶段提交
XA ROLLBACK xid //回滚
XA RECOVER [CONVERT XID] //列出所有处于PREPARE阶段的XA事务
下面是一个简单的msyql XA事务案例,演示了mysql作为全局事务中的一个事务分支,将一行记录插入到一个表中
XA START "xatest";
INSERT INTO USER(id,NAME,age) VALUES(12,"tianshozuhi",22);
XA END "xatest";
XA PREPARE "xatest";
XA COMMIT "xatest";
XA ROLLBACK "xatest"
XA RECOVER
XA事务执行流程
XA事务的状态,按照如下步骤进行展开
-
使用XA START来启动一个XA事务,并把它置于ACTIVE 状态。 -
对于一个ACTIVE状态的 XA事务,我们可以执行构成事务的SQL语句,然后发布一个XA END语句。XA END把事务放入IDLE 状态。 -
对于一个IDLE 状态XA事务,可以执行一个XA PREPARE语句或一个XA COMMIT…ONE PHASE语句:
-
对于一个PREPARED状态的 XA事务,您可以发布一个XA COMMIT语句来提交和终止事务,或者发布XA ROLLBACK来回滚并终止事务。
针对一个给定的客户端连接而言,XA事务和非XA事务(即本地事务)是互斥的。例如,已经执行了”XA START”命令来开启一个XA事务,则本地事务不会被启动,直到XA事务已经被提交或被 回滚为止。相反的,如果已经使用START TRANSACTION启动一个本地事务,则XA语句不能被使用,直到该事务被提交或被 回滚为止。
最后,如果一个XA事务处于ACTIVE状态,是不能直接进行提交的,如果这样做,mysql会抛出异常:
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed
when global transaction is in the ACTIVE state
XID数据结构
mysql中使用xid来作为一个事务分支的标识符。事实上xid作为事务分支标识符是在XA规范中定义的,
XA规范定义了一个xid有4个部分组成:
- gtrid: 全局事务标识符(global transaction identifier),最大不能超过64字节
- bqual: 分支限定符(branch qualifier),最大不能超过64字节
- data: xid的值,其是 gtrid和bqual拼接后的内容。因为gtrid和bqual最大都是64个字节,因此data的最大长度为128
- formatId: 而formatId的作用就是记录gtrid、bqual的格式,。XA规范建议使用OSI CCR风格定义其格式
通过jdbc操作mysql xa事务
MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持,也就是提供了java版本XA接口的实现。意味着我们可以直接通过java代码来执行mysql xa事务。
需要注意的是,业务开发人员在编写代码时,不应该直接操作这些XA事务操作的接口。因为在DTP模型中,RM上的事务分支的开启、结束、准备、提交、回滚等操作,都应该是由事务管理器TM来统一管理。
由于目前我们还没有接触到TM,那么我们不妨做一回"人肉事务管理器",用你智慧的大脑,来控制多个mysql实例上xa事务分支的执行,提交/回滚。通过直接操作这些接口,你将对xa事务有更深刻的认识。
public class MysqlXAConnectionTest {
@Test
public void testTM() throws SQLException {
boolean logXaCommands = true;
Connection connOne = getConnection("jdbc:mysql://182.92.189.235:3310/demo","root", "xiu123");
Connection connTwo = getConnection("jdbc:mysql://182.92.189.235:3310/demo2","root", "xiu123");
XAResource rm1 = getXAResource(logXaCommands,connOne);
XAResource rm2 = getXAResource(logXaCommands,connTwo);
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
rm1.start(xid1, XAResource.TMNOFLAGS);
PreparedStatement ps1 = connOne.prepareStatement("INSERT into `user`(name,age) VALUES ('tianshouzhi','23')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = connTwo.prepareStatement("INSERT INTO `order`(order_num) VALUES ('2')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
boolean onePhase = false;
if (rm1_prepare == XAResource.XA_OK
&& rm2_prepare == XAResource.XA_OK
) {
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
e.printStackTrace();
}
}
private Connection getConnection(String url,String name,String password) throws SQLException {
return DriverManager.getConnection(url, name, password);
}
private XAResource getXAResource(boolean logXaCommands,Connection connection) throws SQLException {
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) connection, logXaCommands);
return xaConn1.getXAResource();
}
在这个案例中,演示了2个RM的情况下分布式事务的工作流程。因为我们充当了"人肉事务管理器”TM,因此很多本应该由TM来处理的工作处理细节也直接体现在上述代码中,如:生成全局事务id和分支事务id、在RM上开启事务分支、两阶段提交等。虽然我们自己作为"人肉事务管理器”是很不可靠的,但是上述代码可以让我们了解一个TM内部的主要工作流程是怎样的。
在实际开发中,代码绝不会像上表面那样复杂,因为我们通常都会使用第三方或者容器提供的TM功能,因此在操作分布式事务时,代码可以得到极大的简化。
MySQL Connector/J XA事务支持源码简单分析**
最后,我们对上述源码进行一下简单的分析。在前面直接使用mysql命令操作的时候,我们通过"XA START xid”等XA命令来执行XA事务。而在上述java代码中,我们是获取了一个普通的链接Connection之后,封装成了MysqlXAConnection 。如下:
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection
public class MysqlXAConnection extends MysqlPooledConnection implements XAConnection, XAResource {
private com.mysql.jdbc.Connection underlyingConnection;
private Log log;
protected boolean logXaCommands;
public MysqlXAConnection(com.mysql.jdbc.Connection connection, boolean logXaCommands) throws SQLException {
super(connection);
this.underlyingConnection = connection;
this.log = connection.getLog();
this.logXaCommands = logXaCommands;
}
}
可以看到,MysqlXAConnection本身就实现了XAResource 接口,因此当调用getXAResource()方法时,返回的就是其自己
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#getXAResource
public XAResource getXAResource() throws SQLException {
return this;
}
之后,我们调用XAResource的start方法来开启XA事务。start方法源码如下所示:
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#start
public void start(Xid xid, int flags) throws XAException {
StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
commandBuf.append("XA START ");
appendXid(commandBuf, xid);
switch (flags) {
case TMJOIN:
commandBuf.append(" JOIN");
break;
case TMRESUME:
commandBuf.append(" RESUME");
break;
case TMNOFLAGS:
break;
default:
throw new XAException(XAException.XAER_INVAL);
}
dispatchCommand(commandBuf.toString());
this.underlyingConnection.setInGlobalTx(true);
}
可以看到,当我们调用MysqlXAConnection的start方法时,实际上就是执行了一个”XA START xid [JOIN|RESUME]”命令而已,和我们直接在命令行中的操作是一样一样的,只不过通过封装简化了我们的操作。
对于MysqlXAConnection的end、prepare、commit、rollback等方法,也都是是类似的,不再赘述。
最后提示, MySQL Connector/J 中提供的XA操作接口,如上面提到的XAConnection、XAResource、Xid等,实际上都遵循了JTA规范。
分布式事务解决方案
seata框架(强一致性)
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。其中AT 模式是Seata 主推的模式,是基于二阶段提交来实现的。
以用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
- 仓储服务(storage-service):对给定的商品扣除仓储数量。
- 订单服务(order-service):根据采购需求创建订单。
- 帐户服务(account-service):从用户帐户中扣除余额。
安装seata服务
-
下载seata: https://github.com/seata/seata/releases -
seata相关脚本 -
解压服务 tar -zxvf seata-server-1.5.1.tar.gz -C /usr/local/seata/ -
修改配置执行初始化脚本
初始化mysql脚本
- 启动服务 sh seata-server.sh 服务安装完成。
用例
项目架构如下:
- seata server 1.5.1
- dubbo
- spring boot
项目以及相关sql
AT模式下全局事务处理
AT模式需要依赖不同的业务数据,所以需要在不同的微服务对应的数据库下创建undo_log表,用于全局事务回滚。
测试正常结果
localhost:8084/order/create
请求参数
{"id":null,"userId":"1","commodityCode":"P001","count":2,"money":140}
响应参数
{
"code": "0",
"msg": "success",
"data": "下单成功!"
}
-
金额扣减成功 -
库存扣减成功x
某个服务出错 调用
调用结果如下:
{
"code": "1",
"msg": "error",
"data": "扣款失败,可能是余额不足!"
}
同时订单创建失败、库存和金额都没有扣减
TCC模式下全局事务处理
还是以下单操作为例。
TCC 模式,不依赖于底层数据资源的事务支持,比较灵活但是
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
@LocalTCC
public interface OrderTCCService {
@TwoPhaseBusinessAction(name = "create", commitMethod = "confirm", rollbackMethod = "cancel")
void create(@BusinessActionContextParameter(paramName = "order") Order order);
void prepare(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
- @LocalTCC:作用于服务接口上,表示实现该接口的实现类被 seata 来管理,seata 根据事务的状态,自动调用我们定义的方法,如果没问题则调用 Commit 方法,否则调用 Rollback 方法。
- @TwoPhaseBusinessAction:该注解用在接口的 Try 方法上,name 为 tcc 方法的 bean 名称,需要全局唯一,一般写方法名即可;commitMethod指定事务成功后的commit方法;rollbackMethod 指定事务失败后的rollback方法。
- @BusinessActionContextParameter:?该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。
SAGA 模式事务处理
Saga是这一篇数据库论文saga提到的一个方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
SAGA事务的特点:
- 并发度高,不用像XA事务那样长期锁定资源
- 需要定义正常操作以及补偿操作,开发量比XA大
- 一致性较弱,对于转账,可能发生A用户已扣款,最后转账又失败的情况
最终一致性
? 基于上述CAP原理,市面上大部分的微服务应用系统都是满足AP,即放弃强一致性通过最终一致性从而保证高可用。针对最终一致性有如下三种模式。
-
可靠事件模式 -
业务补偿模式 -
TCC模式 可靠事件模式
可靠消息模式属于事件驱动架构,还是已上述下单模式为例,如下图
上述事件消费无法正确消费或者重复问题,所以可靠事件模型的特点在于保证可靠事件投递和避免重复消费。
本地事件表
? 本地消息事件表方法将事件和业务数据保存在同一个数据库中,使用一个额外的事件恢复服务来恢复事件,有本地事务保证更新业务和发布事件的原子性。
本地事件表的说明如下: 1、微服务在同一个本地事务中记录业务数据和事件 2、当微服务实时发布一个事件时,立即通知关联的业务服务,如果事件发布成功则立即删除记录的事件。 3、事件恢复服务定时从本地事务表中恢复未发布成功的事件,并重新发布,直到重新发布成功时才删除记录的事件。 ??其中操作 2 主要是为了增加发布事件的实时性,操作 3 是为了保证事件一定被发布。
外部事件表
外部事件表方法将事件持久化到外部的事件系统,事件系统需要提供实时事件消息服务以接收微服务发布的事件,同时事件系统还需要提供事件恢复服务来确认和恢复事件,如下图所示:
外部事件表的说明如下: 1、业务服务在事务提交前进行,通过实时事件服务向事件服务系统请求发送事件,事件系统只记录事件并不真正发送。 2、业务服务提交后,通过实时事件服务向事件系统确认事件需要被成功发送,确认“事件需要被成功发送”后事件系统才真正将事件发布到消息代理中。 3、业务服务在业务回滚时,通过实时事件服务取消事件系统中的事件。 4、如果业务服务在发送确认或取消之前停止了服务怎么办?事件系统的事件恢复服务会定期找到未确认发送的事件,并向业务服务查询事件状态,根据业务服务返回的状态决定事件是发布还是取消。
业务补偿
业务异常:业务逻辑产生错误的情况,如账户余额不足、商品库存不足等。 技术异常:非业务逻辑产生的异常,如网络连接异常、网络超时等。 ??补偿模式使用一个额外的协调服务(补偿框架)来协调各个需要保证一致性的微服务,协调服务按顺序调用各个服务,如果某个微服务调用异常(包括业务异常和技术异常),则取消之前所有已经调用成功的微服务。
还是以上述下单为例,所有业务补偿,其实是相关服务都需要失败后的回滚业务接口
比如 订单服务需要有创建订单接口外 还需要有取消订单接口(失败的回滚操作),库存和账户服务除了有扣减库存和扣减金额外还需要提供对应的恢复库存和恢复金额的操作。统一使用补偿框架进行成功提交或者失败回滚调用。
|