IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> seata源码<2> -> 正文阅读

[大数据]seata源码<2>

	//我们再来接上面的 来讲 当执行业务sql的时候 seata 会给我们点到这里
    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
	 如果不是分布式事务
        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            [以前的 这个就是原生的]
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
/// 创建一个sql识别器
        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
也就是说当执行业务sql的时候 ,会执行到这里 如果是不是全局事务的话,以前是什么样的 现在还是什么洋的

如果是全局事务的话
先创建一个sql识别器 根据不同的sql类型来执行对应的sql
然后 去执行对应sql  doExecute 我们此时的连接肯定是自动提交的
    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
 
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
        ..  吧连接设置为false
            connectionProxy.setAutoCommit(false);
//  这里是一个线程
            return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            ((ConnectionProxy) connectionProxy).getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
 会吧线程传进这个方法
 死循环 如果 callable.call(); 正常通过就结束了 如果不是正常通过有异常了 会触发重试机制
 会有什么异常 [竞争锁  就得看对应的线程会抛出什么异常]
        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                //  
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    onException(lockConflict);
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }


     T result = executeAutoCommitFalse(args);
    connectionProxy.commit();
    
..我们看下这个线程
这个方法没有什么 就是得到一个前置镜像
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        TableRecords beforeImage = beforeImage();
        // 执行业务代码
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
       // 得到后置对象
        TableRecords afterImage = afterImage(beforeImage);
       //生成这个sqlUndoLog  记录
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
connectionProxy.commit();  然后注册分支事务
register(); ——--》》
注册失败会抛一个异常   recognizeLockKeyConflictException(e, context.buildLockKeys());


这个异常就会在上面的死循环中被捕获  处理的逻辑就是

 lockRetryController.sleep(lockConflict);
 重试30次数 每次10s
 如果你重试这么多次还是没成功的话 我就抛异常处理

 

在这里插入图片描述
在这里插入图片描述

这里的逻辑大概就是这样的

在这里插入图片描述

我这里开了2个线程用来模拟全局锁
  
thread1 线程1
SELECT * FROM  t_user WHERE ID='14' 
update  t_user set username='zhangsan' where id='14'
SELECT * FROM  t_user WHERE ID='14' 

thread2 线程2
SELECT * FROM  t_user WHERE ID='14' 
update  t_user set username='LISI' where id='14'
SELECT * FROM  t_user WHERE ID='14' 
第一个线程的意思是  将id=14 的username 改成zhangsan
线程2吧id =14改成lisi

如果线程1执行完业务代码update 之后后置镜像还没有执行
此时线程2 就开始执行 前置镜像 业务代码 后置镜像  得到结果是username =lisi 

这个时候线程1 再来进行查询的时候 得到的username 就是zhangsan 就会出现数据不一致的现象


所以seata 此时关闭了自动提交事务 意思是让
SELECT * FROM  t_user WHERE ID='14' 
update  t_user set username='LISI' where id='14'
SELECT * FROM  t_user WHERE ID='14' 
前置镜像 业务代码 后置镜像 这3条语句保证原子性操作
代码中,如果锁冲突了 我会去申请锁冲突[锁冲突情况下怎么去进行 重试 通过什么来保证重试]

 

 
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:25:14  更:2022-05-05 11:26:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:58:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码