IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> rocketmq 常用Api(二) -> 正文阅读

[大数据]rocketmq 常用Api(二)

目录

事务消息

流程图

简单应用

重试机制

顺序消费


事务消息

分布式系统中的事务可以用2pc(两阶段提交、tcc(补偿事务)来解决分布式系统中的消息原子性RocketMq4.3+ 提供分布式事务功能,通过Rocketmq事务消息能达到分布式事务的最终一致性

第一阶段尝试提交

第二阶段确认ok

数据库就是2pc 提交的数据不会立即生效。再次确认的时候才会持久化,如果给的rollback就会把数据丢弃

tcc? try——confirm——cancel

RocketMq实现方式

Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

检查事务状态:Broker会开启一个定时任务,在消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

超时:如果超过回查次数,默认回滚消息

TransactionListener

executeLocalTransaction半消息发送成功触发来执行本地事务

checkLocalTransaction

broker将发送检查消息来检查事务状态,并将调用方法来获取本地事务状态

本地事务执行状态

public enum LocalTransactionState {
    COMMIT_MESSAGE,//执行事务成功,确认提交
    ROLLBACK_MESSAGE,//回滚消息,broker端回删除半消息
    UNKNOW;//暂时为位置状态,等待broker回查
}

流程图

简单应用

TransactionMQProducer producer = new TransactionMQProducer("xxoogp");
//回调
producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //执行本地事务
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                //Broker端回调检查,检查事务
                return LocalTransactionState.COMMIT_MESSAGE;
//                return LocalTransactionState.UNKNOW;
//                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

重试机制

producer

默认超时时间

//异步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendAsyncFailed(1);
//同步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendFailed(1);
//是否向其他broker发送请求 默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);

consumer

//消费超时,单位分钟
consumer.setConsumeTimeout(1);
//发送ack,消费失败
RECONSUME_LATER

broker投递

只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试,重投使用messageDelayLevel

默认值

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 19m 20m 30m 1h 2h

顺序消费

FIFO是topic里的queue来维护的。是一个几乎无限大的数组。每生成一个topic就会默认在里面创建4个queue。

为了保证FIFO,所以就要同步添加到同一topic的同一个queue里去。所以就有queue选择器

MessageQueueSelector

producer.send(message,
        //queue选择器,向topic中的哪个queue去写消息
        new MessageQueueSelector() {
            @Override
            //手动 选择一个queue  list 所有的queue列表,msg具体要发的消息,o,外面send传的后一个参数
            public MessageQueue select(List<MessageQueue> list, Message msg, Object o) {
                return null;
            }
        },args,2000);

自定义实现这个方法选择queue。也有封装的三种

基于附带参数的hash

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}

基于随机

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    public SelectMessageQueueByRandom() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = this.random.nextInt(mqs.size());
        return (MessageQueue)mqs.get(value);
    }
}

开源版本没有实现的机房策略

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    public SelectMessageQueueByMachineRoom() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return this.consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

以上三种封装选择去topic下哪个queue里去传输数据

在消费者的时候

consumer.registerMessageListener()

MessageListenerConcurrently 并发消息/多线程

MessageListenerOrderly 顺序消费,对一个queue开启一个线程,多个queue多个线程

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        return null;
    }
});

因为是多线程,所以就有一些线程池的参数可以配置

//最大开启消费线程数
consumer.setConsumeThreadMax(4);
//最小线程数
consumer.setConsumeThreadMin(1);

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-04 13:30:42  更:2021-12-04 13:32:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:59:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码