//我们再来接上面的 来讲 当执行业务sql的时候 seata 会给我们点到这里
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
如果不是分布式事务
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
[以前的 这个就是原生的]
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
/// 创建一个sql识别器
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException)ex;
}
return rs;
}
也就是说当执行业务sql的时候 ,会执行到这里 如果是不是全局事务的话,以前是什么样的 现在还是什么洋的
如果是全局事务的话
先创建一个sql识别器 根据不同的sql类型来执行对应的sql
然后 去执行对应sql doExecute 我们此时的连接肯定是自动提交的
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
.. 吧连接设置为false
connectionProxy.setAutoCommit(false);
// 这里是一个线程
return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
((ConnectionProxy) connectionProxy).getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
会吧线程传进这个方法
死循环 如果 callable.call(); 正常通过就结束了 如果不是正常通过有异常了 会触发重试机制
会有什么异常 [竞争锁 就得看对应的线程会抛出什么异常]
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
//
return callable.call();
} catch (LockConflictException lockConflict) {
onException(lockConflict);
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
..我们看下这个线程
这个方法没有什么 就是得到一个前置镜像
protected T executeAutoCommitFalse(Object[] args) throws Exception {
TableRecords beforeImage = beforeImage();
// 执行业务代码
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 得到后置对象
TableRecords afterImage = afterImage(beforeImage);
//生成这个sqlUndoLog 记录
prepareUndoLog(beforeImage, afterImage);
return result;
}
connectionProxy.commit(); 然后注册分支事务
register(); ——--》》
注册失败会抛一个异常 recognizeLockKeyConflictException(e, context.buildLockKeys());
这个异常就会在上面的死循环中被捕获 处理的逻辑就是
lockRetryController.sleep(lockConflict);
重试30次数 每次10s
如果你重试这么多次还是没成功的话 我就抛异常处理
这里的逻辑大概就是这样的
我这里开了2个线程用来模拟全局锁
thread1 线程1
SELECT * FROM t_user WHERE ID='14'
update t_user set username='zhangsan' where id='14'
SELECT * FROM t_user WHERE ID='14'
thread2 线程2
SELECT * FROM t_user WHERE ID='14'
update t_user set username='LISI' where id='14'
SELECT * FROM t_user WHERE ID='14'
第一个线程的意思是 将id=14 的username 改成zhangsan
线程2吧id =14改成lisi
如果线程1执行完业务代码update 之后后置镜像还没有执行
此时线程2 就开始执行 前置镜像 业务代码 后置镜像 得到结果是username =lisi
这个时候线程1 再来进行查询的时候 得到的username 就是zhangsan 就会出现数据不一致的现象
所以seata 此时关闭了自动提交事务 意思是让
SELECT * FROM t_user WHERE ID='14'
update t_user set username='LISI' where id='14'
SELECT * FROM t_user WHERE ID='14'
前置镜像 业务代码 后置镜像 这3条语句保证原子性操作
代码中,如果锁冲突了 我会去申请锁冲突[锁冲突情况下怎么去进行 重试 通过什么来保证重试]
|