IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RocketMQ系列之事务消息 -> 正文阅读

[大数据]RocketMQ系列之事务消息

事务这个概念和内容这里就不提了哈,我们这里直接看RocketMQ怎么实现事务消息的,如果真的对事务不了解的,可以看一下这篇:

分布式事务系列之入门_阿小冰的博客-CSDN博客_分布式事务分布式事务系列之入门https://blog.csdn.net/qq_38377525/article/details/123159026


事务消息发送和消费

下面的架子是按照之前的一个框架继续开发的,有必要的话,可以参考一下:

RocketMQ系列之消息发送/消费初体验_阿小冰的博客-CSDN博客RocketMQ系列之消息发送/消费初体验https://blog.csdn.net/qq_38377525/article/details/123253718?1、发送创建订单的事务消息,预下单操作

这里定义一个订单的实体类Order,包含订单ID、订单标题两个属性即可

编写事务的模拟接口,代码如下:

@RestController
public class TransactionalController {
    @Autowired
    private Source source;

    @GetMapping("/transactional")
    public String tramsactional(){
        Order order=new Order("1","上海浦东");
        String transactionId=UUID.randomUUID().toString();
        MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId);
        Message message=builder.build();
        source.output().send(message);
        return "order is ok";
    }
}

2、配置文件

server.port=8070

spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=producer-demo-group

3、创建事务监听器

@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //获取前面订单生成的事务id
            String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            //以事务ID为主键,执行本地事务
            Order order=(Order) message.getPayload();
            boolean result = this.saveOrder(order, transactionId);
            return result?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;
        }catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //获取事务ID
        String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        //以事务ID为主键,查询本地事务执行情况
        if(isSuccess(transactionId)){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    /**
     * @Description: 将事务ID设置为唯一主键,调用数据库insert
     */
    private boolean saveOrder(Order order,String transactionId){
        return true;
    }

    /**
     * @Description: 查询数据库
     */
    private boolean isSuccess(String transactionId){
        return true;
    }
}

使用@RocketMQTransactionListener注解用于接收本地事务的监听,txProducerGroup是事务组名称,和前面定义的OrderTransactionGroup保持一致,其中这个接口还有两个实现方法,也在上面的代码有所体现

  • executeLocalTransaction:执行本地事务,在消息发送成功会回调执行,一单事务提交成功,下游应用的消费者就能收到该消息
  • checkLocalTransaction:检查本地事务执行状态,如果executeLocalTransaction方法中返回的状态是位置UNKNOWN或者为返回的状态,就会默认在预处理发送的一分钟后由Broker通知Producer检查本地事务,在Producer中回调本地事务监听器中的checkLocalTransaction方法,检查本地事务时,可以根据事务ID查询本地事务的状态,再返回具体事务状态给Broker

4、消费者不变,和上面提供链接中的消费者保持一致即可,然后启动验证即可


事务消息的底层原理

RocketMQ采用的2PC的方案,第一阶段生产者向Broker发送预处理消息,此时消息其实还没有投递出去,这个时候消费者还不能消费,第二阶段生产向Broker发送提交或回滚消息,具体流程如下:

  • 发送预处理消息成功后,开始执行本地事务
  • 如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给消费者

?

  • 如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给消费者,看上图

?

  • ?如果本地事务状态未知,因为网络故障或者生产者挂掉了,Broker没有收到二次确认的相关信息,就会由Broker端发送请求向生产者进行回查,询问并确认提交还是回滚,如果状态一直都是未确认,那就需要人工干预程序了,看上图
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 13:08:12  更:2022-03-06 13:09:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:09:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码