IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RocketMQ -> 正文阅读

[大数据]RocketMQ

架构图

安装

https://www.jianshu.com/p/0e7dad0bed0e
两种方式:
1. 下载源码,maven打包后,进行安装
2. 直接下载二进制码,减压后就行

/usr/local/rocketmq
/root/logs/rocketmq

rocke有9876
非vip通道端口:10911
vip通道端口:10909
10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
如果是broker集群的话,还要开放10912,否则master的消息将无法复制到slave节点


集成

https://www.cnblogs.com/qdhxhz/p/11109696.html

两种方式:
1.引用rocketmq-client,自己进行封装
2.下载rocketmq-sprintboot集成代码,因为没有上传中央仓库,需要自己install,才可以引用


参考:
https://www.jianshu.com/p/12aa00215dd6
https://blog.csdn.net/qq_16241519/article/details/103926356
https://www.cnblogs.com/smail-bao/p/6905460.html?utm_source=itdadao&utm_medium=referral

RemotingTooMuchRequestException: sendDefaultImpl call timeout


namesrvAddr = 192.168.118.162:9876
brokerIP1 = 192.168.118.162

nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.properties &


三种发送模式

?	同步发送 sync
rocketMQTemplate.syncSend("topic-name", "send sync message !");

?	异步发送 async
发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。
   public void async() {
        rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("send successful");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("send fail; {}", throwable.getMessage());
            }
        });
}

?	直接发送 one-way
rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

1.	当发送的消息不重要时,采用one-way方式,以提高吞吐量;
2.	当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
3.	当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;


延迟消息

RocketMQ 目前只支持固定精度的定时消息。
延迟级别(18个等级)
1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒


去重

1. 幂等性
2. 保存已处理的信息,新消息进行判重
3. 如果消息有顺序性,则可以添加时间戳,客户端维护一个最后处理的时间戳,如果是之前消息则抛弃,否则处理更新


幂等性

业务 关联

查询:天然的幂等性
删除:删除多次也是删除成功
添加:
1. 业务主键 的分析
2. DB 对此表创建唯一索引
3. 插入的时候,先判断 ,然后在插入
- 查询的时候用分布式锁子
- 更新的时候查询 
insert into sys_user(name,password) select 'admin', '123456' from dual where not exists(select 1 from sys_user where name='admin');
更新:
1. 乐观锁,添加一个version
- 查询出的version=1,update的时候 需要将version添加到where version=1为条件
update sys_user set age=age+1, version=version+1 where version=xx
这个版本号也可以使用更新时间来判断。
2. 更新前查询
- 分布式锁
- 悲观锁,select for update
- 获取数据,判断是否修改

备注:insert,update 及时没有 业务唯一序列号,也需要手动添加一个。


拉取消息

推拉是站在MQ端。

push 推,MQ给客户端推送
pull 拉,客户端主动定时去轮训MQ,RocketMQMessageListener就是一种拉模式

实际没有推模式,push是通过pull来实现。


顺序消息

保证发送的是同一个队列。
1. 所有消息都放在同一个队列
2. 将一个订单的消息放在同一个队列(√)

发送在同一个消息队列中,根据先到先出的原则,消息就会按照发送的顺序挨个消费。及时是多个消费者也是挨个消费。

这是默认的消息队列选择器:
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}


事务消息

?	设置事务监听
package com.xxx.listener;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@Slf4j
@RocketMQTransactionListener(txProducerGroup = "rocket")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("回查信息:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.COMMIT;
    }
}

?	发送消息
package com.xxx;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
@SpringBootApplication
public class SpringBootRocketMqApplication {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);

        RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
        while (true) {
            String msg = "demo msg test";
            log.info("开始发送消息:"+msg);

            Message message = MessageBuilder.withPayload(msg).build();
            TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
            log.info("消息发送响应信息:"+result.toString());

            Thread.sleep(10);
        }
    }
}


广播消息

发送消息不变,消费端设置消息类型(默认负载均衡)
@RocketMQMessageListener(messageModel=MessageModel.BROADCASTING)


end

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:28:13  更:2022-04-09 18:29:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:08:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码